mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Add godoc for some kubelet funcs
This commit is contained in:
parent
4a0e0826e5
commit
b9f0e8c610
@ -115,6 +115,7 @@ const (
|
||||
// Period for performing global cleanup tasks.
|
||||
housekeepingPeriod = time.Second * 2
|
||||
|
||||
// The path in containers' filesystems where the hosts file is mounted.
|
||||
etcHostsPath = "/etc/hosts"
|
||||
|
||||
// Capacity of the channel for receiving pod lifecycle events. This number
|
||||
@ -131,7 +132,7 @@ const (
|
||||
// and/or when there are many container changes in one cycle.
|
||||
plegRelistPeriod = time.Second * 1
|
||||
|
||||
// backOffPeriod is the period to back off when pod syncing resulting in an
|
||||
// backOffPeriod is the period to back off when pod syncing results in an
|
||||
// error. It is also used as the base period for the exponential backoff
|
||||
// container restarts and image pulls.
|
||||
backOffPeriod = time.Second * 10
|
||||
@ -494,6 +495,8 @@ func NewMainKubelet(
|
||||
return klet, nil
|
||||
}
|
||||
|
||||
// effectiveHairpinMode determines the effective hairpin mode given the
|
||||
// configured mode, container runtime, and whether cbr0 should be configured.
|
||||
func effectiveHairpinMode(hairpinMode componentconfig.HairpinMode, containerRuntime string, configureCBR0 bool) (componentconfig.HairpinMode, error) {
|
||||
// The hairpin mode setting doesn't matter if:
|
||||
// - We're not using a bridge network. This is hard to check because we might
|
||||
@ -542,14 +545,23 @@ type Kubelet struct {
|
||||
runtimeCache kubecontainer.RuntimeCache
|
||||
kubeClient clientset.Interface
|
||||
rootDirectory string
|
||||
podWorkers PodWorkers
|
||||
|
||||
// podWorkers handle syncing Pods in response to events.
|
||||
podWorkers PodWorkers
|
||||
|
||||
// resyncInterval is the interval between periodic full reconciliations of
|
||||
// pods on this node.
|
||||
resyncInterval time.Duration
|
||||
sourcesReady SourcesReadyFn
|
||||
|
||||
// sourcesReady is a function to call to determine if all config sources
|
||||
// are ready.
|
||||
sourcesReady SourcesReadyFn
|
||||
// sourcesSeen records the sources seen by kubelet. This set is not thread
|
||||
// safe and should only be access by the main kubelet syncloop goroutine.
|
||||
sourcesSeen sets.String
|
||||
|
||||
// podManager is a facade that abstracts away the various sources of pods
|
||||
// this Kubelet services.
|
||||
podManager kubepod.Manager
|
||||
|
||||
// Needed to report events for containers belonging to deleted/modified pods.
|
||||
@ -582,10 +594,14 @@ type Kubelet struct {
|
||||
// If non-nil, use this for container DNS server.
|
||||
clusterDNS net.IP
|
||||
|
||||
// masterServiceNamespace is the namespace that the master service is exposed in.
|
||||
masterServiceNamespace string
|
||||
serviceLister serviceLister
|
||||
nodeLister nodeLister
|
||||
nodeInfo predicates.NodeInfo
|
||||
// serviceLister knows how to list services
|
||||
serviceLister serviceLister
|
||||
// nodeLister knows how to list nodes
|
||||
nodeLister nodeLister
|
||||
// nodeInfo knows how to get information about the node for this kubelet.
|
||||
nodeInfo predicates.NodeInfo
|
||||
|
||||
// a list of node labels to register
|
||||
nodeLabels map[string]string
|
||||
@ -662,6 +678,7 @@ type Kubelet struct {
|
||||
// Store kubecontainer.PodStatus for all pods.
|
||||
podCache kubecontainer.Cache
|
||||
|
||||
// os is a facade for various syscalls that need to be mocked during testing.
|
||||
os kubecontainer.OSInterface
|
||||
|
||||
// Watcher of out of memory events.
|
||||
@ -796,11 +813,14 @@ func (kl *Kubelet) validateNodeIP() error {
|
||||
return fmt.Errorf("Node IP: %q not found in the host's network interfaces", kl.nodeIP.String())
|
||||
}
|
||||
|
||||
// allSourcesReady returns whether all seen pod sources are ready.
|
||||
func (kl *Kubelet) allSourcesReady() bool {
|
||||
// Make a copy of the sourcesSeen list because it's not thread-safe.
|
||||
return kl.sourcesReady(sets.NewString(kl.sourcesSeen.List()...))
|
||||
}
|
||||
|
||||
// addSource adds a new pod source to the list of sources seen.
|
||||
// TODO: reassess in the context of kubernetes/24810
|
||||
func (kl *Kubelet) addSource(source string) {
|
||||
kl.sourcesSeen.Insert(source)
|
||||
}
|
||||
@ -840,6 +860,8 @@ func (kl *Kubelet) GetPodDir(podUID types.UID) string {
|
||||
return kl.getPodDir(podUID)
|
||||
}
|
||||
|
||||
// getPodDir returns the full path to the per-pod directory for the pod with
|
||||
// the given UID.
|
||||
func (kl *Kubelet) getPodDir(podUID types.UID) string {
|
||||
// Backwards compat. The "old" stuff should be removed before 1.0
|
||||
// release. The thinking here is this:
|
||||
@ -911,6 +933,7 @@ func (kl *Kubelet) getPodContainerDir(podUID types.UID, ctrName string) string {
|
||||
return newPath
|
||||
}
|
||||
|
||||
// dirExists returns true if the path exists and represents a directory.
|
||||
func dirExists(path string) bool {
|
||||
s, err := os.Stat(path)
|
||||
if err != nil {
|
||||
@ -919,6 +942,10 @@ func dirExists(path string) bool {
|
||||
return s.IsDir()
|
||||
}
|
||||
|
||||
// setupDataDirs creates:
|
||||
// 1. the root directory
|
||||
// 2. the pods directory
|
||||
// 3. the plugins directory
|
||||
func (kl *Kubelet) setupDataDirs() error {
|
||||
kl.rootDirectory = path.Clean(kl.rootDirectory)
|
||||
if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {
|
||||
@ -948,6 +975,7 @@ func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) {
|
||||
return pods, nil
|
||||
}
|
||||
|
||||
// GetNode returns the node info for the configured node name of this Kubelet.
|
||||
func (kl *Kubelet) GetNode() (*api.Node, error) {
|
||||
if kl.standaloneMode {
|
||||
return kl.initialNodeStatus()
|
||||
@ -1048,6 +1076,8 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
||||
kl.syncLoop(updates, kl)
|
||||
}
|
||||
|
||||
// initialNodeStatus determines the initial node status, incorporating node
|
||||
// labels and information from the cloud provider.
|
||||
func (kl *Kubelet) initialNodeStatus() (*api.Node, error) {
|
||||
node := &api.Node{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
@ -1237,6 +1267,7 @@ func (kl *Kubelet) relabelVolumes(pod *api.Pod, volumes kubecontainer.VolumeMap)
|
||||
return nil
|
||||
}
|
||||
|
||||
// makeMounts determines the mount points for the given container.
|
||||
func makeMounts(pod *api.Pod, podDir string, container *api.Container, hostName, hostDomain, podIP string, podVolumes kubecontainer.VolumeMap) ([]kubecontainer.Mount, error) {
|
||||
// Kubernetes only mounts on /etc/hosts if :
|
||||
// - container does not use hostNetwork and
|
||||
@ -1280,6 +1311,8 @@ func makeMounts(pod *api.Pod, podDir string, container *api.Container, hostName,
|
||||
return mounts, nil
|
||||
}
|
||||
|
||||
// makeHostsMount makes the mountpoint for the hosts file that the containers
|
||||
// in a pod are injected with.
|
||||
func makeHostsMount(podDir, podIP, hostName, hostDomainName string) (*kubecontainer.Mount, error) {
|
||||
hostsFilePath := path.Join(podDir, "etc-hosts")
|
||||
if err := ensureHostsFile(hostsFilePath, podIP, hostName, hostDomainName); err != nil {
|
||||
@ -1293,6 +1326,8 @@ func makeHostsMount(podDir, podIP, hostName, hostDomainName string) (*kubecontai
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ensureHostsFile ensures that the given host file has an up-to-date ip, host
|
||||
// name, and domain name.
|
||||
func ensureHostsFile(fileName, hostIP, hostName, hostDomainName string) error {
|
||||
if _, err := os.Stat(fileName); os.IsExist(err) {
|
||||
glog.V(4).Infof("kubernetes-managed etc-hosts file exits. Will not be recreated: %q", fileName)
|
||||
@ -1475,7 +1510,7 @@ func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) {
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// Make the service environment variables for a pod in the given namespace.
|
||||
// Make the environment variables for a pod in the given namespace.
|
||||
func (kl *Kubelet) makeEnvironmentVariables(pod *api.Pod, container *api.Container, podIP string) ([]kubecontainer.EnvVar, error) {
|
||||
var result []kubecontainer.EnvVar
|
||||
// Note: These are added to the docker.Config, but are not included in the checksum computed
|
||||
@ -1570,6 +1605,8 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *api.Pod, container *api.Contain
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// podFieldSelectorRuntimeValue returns the runtime value of the given
|
||||
// selector for a pod.
|
||||
func (kl *Kubelet) podFieldSelectorRuntimeValue(fs *api.ObjectFieldSelector, pod *api.Pod, podIP string) (string, error) {
|
||||
internalFieldPath, _, err := api.Scheme.ConvertFieldLabel(fs.APIVersion, "Pod", fs.FieldPath, "")
|
||||
if err != nil {
|
||||
@ -1656,6 +1693,10 @@ type dnsScrubber interface {
|
||||
ScrubDNS(nameservers, searches []string) (nsOut, srchOut []string)
|
||||
}
|
||||
|
||||
// parseResolveConf reads a resolv.conf file from the given reader, and parses
|
||||
// it into nameservers and searches, possibly returning an error. The given
|
||||
// dnsScrubber allows cloud providers to post-process dns names.
|
||||
// TODO: move to utility package
|
||||
func parseResolvConf(reader io.Reader, dnsScrubber dnsScrubber) (nameservers []string, searches []string, err error) {
|
||||
file, err := ioutil.ReadAll(reader)
|
||||
if err != nil {
|
||||
@ -1705,8 +1746,6 @@ func (kl *Kubelet) killPod(pod *api.Pod, runningPod *kubecontainer.Pod, status *
|
||||
return kl.containerRuntime.KillPod(pod, p)
|
||||
}
|
||||
|
||||
type empty struct{}
|
||||
|
||||
// makePodDataDirs creates the dirs for the pod datas.
|
||||
func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
|
||||
uid := pod.UID
|
||||
@ -1722,12 +1761,42 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncPod is the transaction script for the sync of a single pod.
|
||||
//
|
||||
// Arguments:
|
||||
//
|
||||
// pod - the pod to sync
|
||||
// mirrorPod - the mirror pod for the pod to sync, if it is a static pod
|
||||
// podStatus - the current status (TODO: always from the status manager?)
|
||||
// updateType - the type of update (ADD, UPDATE, REMOVE, RECONCILE)
|
||||
//
|
||||
// The workflow is:
|
||||
// * If the pod is being created, record pod worker start latency
|
||||
// * Call generateAPIPodStatus to prepare an api.PodStatus for the pod
|
||||
// * If the pod is being seen as running for the first time, record pod
|
||||
// start latency
|
||||
// * Update the status of the pod in the status manager
|
||||
// * Kill the pod if it should not be running
|
||||
// * Create a mirror pod if the pod is a static pod, and does not
|
||||
// already have a mirror pod
|
||||
// * Create the data directories for the pod if they do not exist
|
||||
// * Mount volumes and update the volume manager
|
||||
// * Fetch the pull secrets for the pod
|
||||
// * Call the container runtime's SyncPod callback
|
||||
// * Update the traffic shaping for the pod's ingress and egress limits
|
||||
//
|
||||
// If any step if this workflow errors, the error is returned, and is repeated
|
||||
// on the next syncPod call.
|
||||
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error {
|
||||
// Latency measurements for the main workflow are relative to the
|
||||
// (first time the pod was seen by the API server.
|
||||
var firstSeenTime time.Time
|
||||
if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
|
||||
firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
|
||||
}
|
||||
|
||||
// Record pod worker start latency if being created
|
||||
// TODO: make pod workers record their own latencies
|
||||
if updateType == kubetypes.SyncPodCreate {
|
||||
if !firstSeenTime.IsZero() {
|
||||
// This is the first time we are syncing the pod. Record the latency
|
||||
@ -1738,16 +1807,19 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecont
|
||||
}
|
||||
}
|
||||
|
||||
// Generate final API pod status with pod and status manager status
|
||||
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
|
||||
|
||||
// Record the time it takes for the pod to become running.
|
||||
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
|
||||
if !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning &&
|
||||
!firstSeenTime.IsZero() {
|
||||
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
|
||||
}
|
||||
// Update status in the status manager
|
||||
kl.statusManager.SetPodStatus(pod, apiPodStatus)
|
||||
|
||||
// Kill pods we can't run.
|
||||
// Kill pod if it should not be running
|
||||
if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil {
|
||||
if err := kl.killPod(pod, nil, podStatus); err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
@ -1779,12 +1851,13 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecont
|
||||
}
|
||||
}
|
||||
|
||||
// Make data directories for the pod
|
||||
if err := kl.makePodDataDirs(pod); err != nil {
|
||||
glog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Mount volumes.
|
||||
// Mount volumes and update the volume manager
|
||||
podVolumes, err := kl.mountExternalVolumes(pod)
|
||||
if err != nil {
|
||||
ref, errGetRef := api.GetReference(pod)
|
||||
@ -1796,21 +1869,26 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecont
|
||||
}
|
||||
kl.volumeManager.SetVolumes(pod.UID, podVolumes)
|
||||
|
||||
// Fetch the pull secrets for the pod
|
||||
pullSecrets, err := kl.getPullSecretsForPod(pod)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to get pull secrets for pod %q: %v", format.Pod(pod), err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Call the container runtime's SyncPod callback
|
||||
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
|
||||
kl.reasonCache.Update(pod.UID, result)
|
||||
if err = result.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// early successful exit if pod is not bandwidth-constrained
|
||||
if !kl.shapingEnabled() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update the traffic shaping for the pod's ingress and egress limits
|
||||
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1830,6 +1908,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecont
|
||||
return nil
|
||||
}
|
||||
|
||||
// returns whether the pod uses the host network namespace.
|
||||
func podUsesHostNetwork(pod *api.Pod) bool {
|
||||
return pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork
|
||||
}
|
||||
@ -1852,9 +1931,9 @@ func (kl *Kubelet) getPullSecretsForPod(pod *api.Pod) ([]api.Secret, error) {
|
||||
return pullSecrets, nil
|
||||
}
|
||||
|
||||
// Return name of a volume. When the volume is a PersistentVolumeClaim,
|
||||
// it returns name of the real PersistentVolume bound to the claim.
|
||||
// It returns errror when the clam is not bound yet.
|
||||
// resolveVolumeName returns the name of the persistent volume (PV) claimed by
|
||||
// a persistent volume claim (PVC) or an error if the claim is not bound.
|
||||
// Returns nil if the volume does not use a PVC.
|
||||
func (kl *Kubelet) resolveVolumeName(pod *api.Pod, volume *api.Volume) (string, error) {
|
||||
claimSource := volume.VolumeSource.PersistentVolumeClaim
|
||||
if claimSource != nil {
|
||||
@ -1893,8 +1972,8 @@ func (kl *Kubelet) getDesiredVolumes(pods []*api.Pod) map[string]api.Volume {
|
||||
return desiredVolumes
|
||||
}
|
||||
|
||||
// cleanupOrphanedPodDirs removes a pod directory if the pod is not in the
|
||||
// desired set of pods and there is no running containers in the pod.
|
||||
// cleanupOrphanedPodDirs removes the volumes of pods that should not be
|
||||
// running and that have no containers running.
|
||||
func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
|
||||
active := sets.NewString()
|
||||
for _, pod := range pods {
|
||||
@ -1926,6 +2005,8 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*api.Pod, runningPods []*kubeco
|
||||
return utilerrors.NewAggregate(errlist)
|
||||
}
|
||||
|
||||
// cleanupBandwidthLimits updates the status of bandwidth-limited containers
|
||||
// and ensures that only the the appropriate CIDRs are active on the node.
|
||||
func (kl *Kubelet) cleanupBandwidthLimits(allPods []*api.Pod) error {
|
||||
if kl.shaper == nil {
|
||||
return nil
|
||||
@ -2082,6 +2163,8 @@ func (kl *Kubelet) podIsTerminated(pod *api.Pod) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// filterOutTerminatedPods returns the given pods which the status manager
|
||||
// does not consider failed or succeeded.
|
||||
func (kl *Kubelet) filterOutTerminatedPods(pods []*api.Pod) []*api.Pod {
|
||||
var filteredPods []*api.Pod
|
||||
for _, p := range pods {
|
||||
@ -2106,6 +2189,12 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods []*api.
|
||||
kl.statusManager.RemoveOrphanedStatuses(podUIDs)
|
||||
}
|
||||
|
||||
// deletePod deletes the pod from the internal state of the kubelet by:
|
||||
// 1. stopping the associated pod worker asynchronously
|
||||
// 2. signaling to kill the pod by sending on the podKillingCh channel
|
||||
//
|
||||
// deletePod returns an error if not all sources are ready or the pod is not
|
||||
// found in the runtime cache.
|
||||
func (kl *Kubelet) deletePod(pod *api.Pod) error {
|
||||
if pod == nil {
|
||||
return fmt.Errorf("deletePod does not allow nil pod")
|
||||
@ -2136,6 +2225,9 @@ func (kl *Kubelet) deletePod(pod *api.Pod) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// empty is a placeholder type used to implement a set
|
||||
type empty struct{}
|
||||
|
||||
// HandlePodCleanups performs a series of cleanup work, including terminating
|
||||
// pod workers, killing unwanted pods, and removing orphaned volumes/pod
|
||||
// directories.
|
||||
@ -2255,6 +2347,9 @@ func (kl *Kubelet) podKiller() {
|
||||
}
|
||||
}
|
||||
|
||||
// podsByCreationTime makes an array of pods sortable by their creation
|
||||
// timestamps.
|
||||
// TODO: move into util package
|
||||
type podsByCreationTime []*api.Pod
|
||||
|
||||
func (s podsByCreationTime) Len() int {
|
||||
@ -2313,6 +2408,8 @@ func (kl *Kubelet) matchesNodeSelector(pod *api.Pod) bool {
|
||||
return predicates.PodMatchesNodeLabels(pod, node)
|
||||
}
|
||||
|
||||
// rejectPod records an event about the pod with the given reason and message,
|
||||
// and updates the pod to the failed phase in the status manage.
|
||||
func (kl *Kubelet) rejectPod(pod *api.Pod, reason, message string) {
|
||||
kl.recorder.Eventf(pod, api.EventTypeWarning, reason, message)
|
||||
kl.statusManager.SetPodStatus(pod, api.PodStatus{
|
||||
@ -2406,11 +2503,45 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand
|
||||
}
|
||||
}
|
||||
|
||||
func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler SyncHandler,
|
||||
// syncLoopIteration reads from various channels and dispatches pods to the
|
||||
// given handler.
|
||||
//
|
||||
// Arguments:
|
||||
// 1. configCh: a channel to read config events from
|
||||
// 2. handler: the SyncHandler to dispatch pods to
|
||||
// 3. syncCh: a channel to read periodic sync events from
|
||||
// 4. houseKeepingCh: a channel to read housekeeping events from
|
||||
// 5. plegCh: a channel to read PLEG updates from
|
||||
//
|
||||
// Events are also read from the kubelet liveness manager's update channel.
|
||||
//
|
||||
// The workflow is to read from one of the channels, handle that event, and
|
||||
// update the timestamp in the sync loop monitor.
|
||||
//
|
||||
// Here is an appropriate place to note that despite the syntactical
|
||||
// similarity to the switch statement, the case statements in a select are
|
||||
// evaluated in a pseudorandom order if there are multiple channels ready to
|
||||
// read from when the select is evaluated. In other words, case statements
|
||||
// are evaluated in random order, and you can not assume that the case
|
||||
// statements evaluate in order if multiple channels have events.
|
||||
//
|
||||
// With that in mind, in truly no particular order, the different channels
|
||||
// are handled as follows:
|
||||
//
|
||||
// * configCh: dispatch the pods for the config change to the appropriate
|
||||
// handler callback for the event type
|
||||
// * plegCh: update the runtime cache; sync pod
|
||||
// * syncCh: sync all pods waiting for sync
|
||||
// * houseKeepingCh: trigger cleanup of pods
|
||||
// * liveness manager: sync pods that have failed or in which one or more
|
||||
// containers have failed liveness checks
|
||||
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
|
||||
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
|
||||
kl.syncLoopMonitor.Store(kl.clock.Now())
|
||||
select {
|
||||
case u, open := <-updates:
|
||||
case u, open := <-configCh:
|
||||
// Update from a config source; dispatch it to the right handler
|
||||
// callback.
|
||||
if !open {
|
||||
glog.Errorf("Update channel is closed. Exiting the sync loop.")
|
||||
return false
|
||||
@ -2439,6 +2570,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
|
||||
glog.Errorf("Kubelet does not support snapshot update")
|
||||
}
|
||||
case e := <-plegCh:
|
||||
// PLEG event for a pod; sync it.
|
||||
pod, ok := kl.podManager.GetPodByUID(e.ID)
|
||||
if !ok {
|
||||
// If the pod no longer exists, ignore the event.
|
||||
@ -2454,6 +2586,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
|
||||
}
|
||||
handler.HandlePodSyncs([]*api.Pod{pod})
|
||||
case <-syncCh:
|
||||
// Sync pods waiting for sync
|
||||
podsToSync := kl.getPodsToSync()
|
||||
if len(podsToSync) == 0 {
|
||||
break
|
||||
@ -2461,8 +2594,9 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
|
||||
glog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
|
||||
kl.HandlePodSyncs(podsToSync)
|
||||
case update := <-kl.livenessManager.Updates():
|
||||
// We only care about failures (signalling container death) here.
|
||||
if update.Result == proberesults.Failure {
|
||||
// The liveness manager detected a failure; sync the pod.
|
||||
|
||||
// We should not use the pod from livenessManager, because it is never updated after
|
||||
// initialization.
|
||||
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
|
||||
@ -2475,6 +2609,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
|
||||
handler.HandlePodSyncs([]*api.Pod{pod})
|
||||
}
|
||||
case <-housekeepingCh:
|
||||
// It's time to do housekeeping
|
||||
if !kl.allSourcesReady() {
|
||||
// If the sources aren't ready, skip housekeeping, as we may
|
||||
// accidentally delete pods from unready sources.
|
||||
@ -2490,10 +2625,12 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
|
||||
return true
|
||||
}
|
||||
|
||||
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
|
||||
// If the pod is terminated, dispatchWork
|
||||
func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) {
|
||||
if kl.podIsTerminated(pod) {
|
||||
if pod.DeletionTimestamp != nil {
|
||||
// If the pod is in a termianted state, there is no pod worker to
|
||||
// If the pod is in a terminated state, there is no pod worker to
|
||||
// handle the work item. Check if the DeletionTimestamp has been
|
||||
// set, and force a status update to trigger a pod deletion request
|
||||
// to the apiserver.
|
||||
@ -2521,6 +2658,8 @@ func (kl *Kubelet) handleMirrorPod(mirrorPod *api.Pod, start time.Time) {
|
||||
}
|
||||
}
|
||||
|
||||
// HandlePodAdditions is the callback in SyncHandler for pods being added from
|
||||
// a config source.
|
||||
func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) {
|
||||
start := kl.clock.Now()
|
||||
sort.Sort(podsByCreationTime(pods))
|
||||
@ -2547,6 +2686,8 @@ func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) {
|
||||
}
|
||||
}
|
||||
|
||||
// HandlePodUpdates is the callback in the SyncHandler interface for pods
|
||||
// being updated from a config source.
|
||||
func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) {
|
||||
start := kl.clock.Now()
|
||||
for _, pod := range pods {
|
||||
@ -2562,6 +2703,8 @@ func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) {
|
||||
}
|
||||
}
|
||||
|
||||
// HandlePodDeletions is the callback in the SyncHandler interface for pods
|
||||
// being deleted from a config source.
|
||||
func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) {
|
||||
start := kl.clock.Now()
|
||||
for _, pod := range pods {
|
||||
@ -2579,6 +2722,8 @@ func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) {
|
||||
}
|
||||
}
|
||||
|
||||
// HandlePodReconcile is the callback in the SyncHandler interface for pods
|
||||
// that should be reconciled.
|
||||
func (kl *Kubelet) HandlePodReconcile(pods []*api.Pod) {
|
||||
for _, pod := range pods {
|
||||
// Update the pod in pod manager, status manager will do periodically reconcile according
|
||||
@ -2587,6 +2732,8 @@ func (kl *Kubelet) HandlePodReconcile(pods []*api.Pod) {
|
||||
}
|
||||
}
|
||||
|
||||
// HandlePodSyncs is the callback in the syncHandler interface for pods
|
||||
// that should be dispatched to pod workers for sync.
|
||||
func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) {
|
||||
start := kl.clock.Now()
|
||||
for _, pod := range pods {
|
||||
@ -2595,6 +2742,7 @@ func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) {
|
||||
}
|
||||
}
|
||||
|
||||
// LatestLoopEntryTime returns the last time in the sync loop monitor.
|
||||
func (kl *Kubelet) LatestLoopEntryTime() time.Time {
|
||||
val := kl.syncLoopMonitor.Load()
|
||||
if val == nil {
|
||||
@ -2603,6 +2751,7 @@ func (kl *Kubelet) LatestLoopEntryTime() time.Time {
|
||||
return val.(time.Time)
|
||||
}
|
||||
|
||||
// PLEGHealthCheck returns whether the PLEG is healty.
|
||||
func (kl *Kubelet) PLEGHealthCheck() (bool, error) {
|
||||
return kl.pleg.Healthy()
|
||||
}
|
||||
@ -2729,6 +2878,8 @@ func (kl *Kubelet) GetRunningPods() ([]*api.Pod, error) {
|
||||
return apiPods, nil
|
||||
}
|
||||
|
||||
// GetPodByFullName gets the pod with the given 'full' name, which
|
||||
// incorporates the namespace as well as whether the pod was found.
|
||||
func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.Pod, bool) {
|
||||
return kl.podManager.GetPodByFullName(podFullName)
|
||||
}
|
||||
@ -2739,6 +2890,10 @@ func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
|
||||
return kl.podManager.GetPodByName(namespace, name)
|
||||
}
|
||||
|
||||
// updateRuntimeUp calls the container runtime status callback, initializing
|
||||
// the runtime dependent modules when the container runtime first comes up,
|
||||
// and returns an error if the status check fails. If the status check is OK,
|
||||
// update the container runtime uptime in the kubelet runtimeState.
|
||||
func (kl *Kubelet) updateRuntimeUp() {
|
||||
if err := kl.containerRuntime.Status(); err != nil {
|
||||
glog.Errorf("Container runtime sanity check failed: %v", err)
|
||||
@ -2787,6 +2942,8 @@ func (kl *Kubelet) updateNodeStatus() error {
|
||||
return fmt.Errorf("update node status exceeds retry count")
|
||||
}
|
||||
|
||||
// recordNodeStatusEvent records an event of the given type with the given
|
||||
// message for the node.
|
||||
func (kl *Kubelet) recordNodeStatusEvent(eventtype, event string) {
|
||||
glog.V(2).Infof("Recording %s event message for node %s", event, kl.nodeName)
|
||||
// TODO: This requires a transaction, either both node status is updated
|
||||
@ -2794,6 +2951,11 @@ func (kl *Kubelet) recordNodeStatusEvent(eventtype, event string) {
|
||||
kl.recorder.Eventf(kl.nodeRef, eventtype, event, "Node %s status is now: %s", kl.nodeName, event)
|
||||
}
|
||||
|
||||
// syncNetworkStatus updates the network state, ensuring that the network is
|
||||
// configured correctly if the kubelet is set to configure cbr0:
|
||||
// * handshake flannel helper if the flannel experimental overlay is being used.
|
||||
// * ensure that iptables masq rules are setup
|
||||
// * reconcile cbr0 with the pod CIDR
|
||||
func (kl *Kubelet) syncNetworkStatus() {
|
||||
var err error
|
||||
if kl.configureCBR0 {
|
||||
@ -3275,6 +3437,8 @@ func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase {
|
||||
}
|
||||
}
|
||||
|
||||
// generateAPIPodStatus creates the final API pod status for a pod, given the
|
||||
// internal pod status.
|
||||
func (kl *Kubelet) generateAPIPodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus {
|
||||
glog.V(3).Infof("Generating status for %q", format.Pod(pod))
|
||||
// TODO: Consider include the container information.
|
||||
@ -3310,6 +3474,9 @@ func (kl *Kubelet) generateAPIPodStatus(pod *api.Pod, podStatus *kubecontainer.P
|
||||
return *s
|
||||
}
|
||||
|
||||
// convertStatusToAPIStatus creates an api PodStatus for the given pod from
|
||||
// the given internal pod status. It is purely transformative and does not
|
||||
// alter the kubelet state at all.
|
||||
func (kl *Kubelet) convertStatusToAPIStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) *api.PodStatus {
|
||||
var apiPodStatus api.PodStatus
|
||||
uid := pod.UID
|
||||
@ -3474,6 +3641,8 @@ func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, contain
|
||||
return kl.runner.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty)
|
||||
}
|
||||
|
||||
// AttachContainer uses the container runtime to attach the given streams to
|
||||
// the given container.
|
||||
func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
|
||||
podUID = kl.podManager.TranslatePodUID(podUID)
|
||||
|
||||
@ -3509,10 +3678,12 @@ func (kl *Kubelet) BirthCry() {
|
||||
kl.recorder.Eventf(kl.nodeRef, api.EventTypeNormal, kubecontainer.StartingKubelet, "Starting kubelet.")
|
||||
}
|
||||
|
||||
// StreamingConnectionIdleTimeout returns the timeout for streaming connections to the HTTP server.
|
||||
func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
|
||||
return kl.streamingConnectionIdleTimeout
|
||||
}
|
||||
|
||||
// ResyncInterval returns the interval used for periodic syncs.
|
||||
func (kl *Kubelet) ResyncInterval() time.Duration {
|
||||
return kl.resyncInterval
|
||||
}
|
||||
@ -3544,10 +3715,13 @@ func (kl *Kubelet) GetContainerInfoV2(name string, options cadvisorapiv2.Request
|
||||
return kl.cadvisor.ContainerInfoV2(name, options)
|
||||
}
|
||||
|
||||
// DockerImagesFsInfo returns information about docker image fs usage from
|
||||
// cadvisor.
|
||||
func (kl *Kubelet) DockerImagesFsInfo() (cadvisorapiv2.FsInfo, error) {
|
||||
return kl.cadvisor.DockerImagesFsInfo()
|
||||
}
|
||||
|
||||
// RootFsInfo returns info about the root fs from cadvisor.
|
||||
func (kl *Kubelet) RootFsInfo() (cadvisorapiv2.FsInfo, error) {
|
||||
return kl.cadvisor.RootFsInfo()
|
||||
}
|
||||
@ -3579,10 +3753,12 @@ func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) {
|
||||
return kl.machineInfo, nil
|
||||
}
|
||||
|
||||
// ListenAndServe runs the kubelet HTTP server.
|
||||
func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool) {
|
||||
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, kl.containerRuntime)
|
||||
}
|
||||
|
||||
// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
|
||||
func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
|
||||
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, kl.containerRuntime)
|
||||
}
|
||||
@ -3593,6 +3769,8 @@ func (kl *Kubelet) GetRuntime() kubecontainer.Runtime {
|
||||
return kl.containerRuntime
|
||||
}
|
||||
|
||||
// updatePodCIDR updates the pod CIDR in the runtime state if it is different
|
||||
// from the current CIDR.
|
||||
func (kl *Kubelet) updatePodCIDR(cidr string) {
|
||||
if kl.runtimeState.podCIDR() == cidr {
|
||||
return
|
||||
@ -3608,6 +3786,7 @@ func (kl *Kubelet) updatePodCIDR(cidr string) {
|
||||
}
|
||||
}
|
||||
|
||||
// shapingEnabled returns whether traffic shaping is enabled.
|
||||
func (kl *Kubelet) shapingEnabled() bool {
|
||||
// Disable shaping if a network plugin is defined and supports shaping
|
||||
if kl.networkPlugin != nil && kl.networkPlugin.Capabilities().Has(network.NET_PLUGIN_CAPABILITY_SHAPING) {
|
||||
@ -3616,6 +3795,7 @@ func (kl *Kubelet) shapingEnabled() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// GetNodeConfig returns the container manager node config.
|
||||
func (kl *Kubelet) GetNodeConfig() cm.NodeConfig {
|
||||
return kl.containerManager.GetNodeConfig()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user