From b9f0e8c610fe496e4015561740eb3d4a63781a53 Mon Sep 17 00:00:00 2001 From: Paul Morie Date: Thu, 28 Apr 2016 00:26:36 -0400 Subject: [PATCH] Add godoc for some kubelet funcs --- pkg/kubelet/kubelet.go | 220 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 200 insertions(+), 20 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 3d89c070848..871d04b69a8 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -115,6 +115,7 @@ const ( // Period for performing global cleanup tasks. housekeepingPeriod = time.Second * 2 + // The path in containers' filesystems where the hosts file is mounted. etcHostsPath = "/etc/hosts" // Capacity of the channel for receiving pod lifecycle events. This number @@ -131,7 +132,7 @@ const ( // and/or when there are many container changes in one cycle. plegRelistPeriod = time.Second * 1 - // backOffPeriod is the period to back off when pod syncing resulting in an + // backOffPeriod is the period to back off when pod syncing results in an // error. It is also used as the base period for the exponential backoff // container restarts and image pulls. backOffPeriod = time.Second * 10 @@ -494,6 +495,8 @@ func NewMainKubelet( return klet, nil } +// effectiveHairpinMode determines the effective hairpin mode given the +// configured mode, container runtime, and whether cbr0 should be configured. func effectiveHairpinMode(hairpinMode componentconfig.HairpinMode, containerRuntime string, configureCBR0 bool) (componentconfig.HairpinMode, error) { // The hairpin mode setting doesn't matter if: // - We're not using a bridge network. This is hard to check because we might @@ -542,14 +545,23 @@ type Kubelet struct { runtimeCache kubecontainer.RuntimeCache kubeClient clientset.Interface rootDirectory string - podWorkers PodWorkers + // podWorkers handle syncing Pods in response to events. + podWorkers PodWorkers + + // resyncInterval is the interval between periodic full reconciliations of + // pods on this node. resyncInterval time.Duration - sourcesReady SourcesReadyFn + + // sourcesReady is a function to call to determine if all config sources + // are ready. + sourcesReady SourcesReadyFn // sourcesSeen records the sources seen by kubelet. This set is not thread // safe and should only be access by the main kubelet syncloop goroutine. sourcesSeen sets.String + // podManager is a facade that abstracts away the various sources of pods + // this Kubelet services. podManager kubepod.Manager // Needed to report events for containers belonging to deleted/modified pods. @@ -582,10 +594,14 @@ type Kubelet struct { // If non-nil, use this for container DNS server. clusterDNS net.IP + // masterServiceNamespace is the namespace that the master service is exposed in. masterServiceNamespace string - serviceLister serviceLister - nodeLister nodeLister - nodeInfo predicates.NodeInfo + // serviceLister knows how to list services + serviceLister serviceLister + // nodeLister knows how to list nodes + nodeLister nodeLister + // nodeInfo knows how to get information about the node for this kubelet. + nodeInfo predicates.NodeInfo // a list of node labels to register nodeLabels map[string]string @@ -662,6 +678,7 @@ type Kubelet struct { // Store kubecontainer.PodStatus for all pods. podCache kubecontainer.Cache + // os is a facade for various syscalls that need to be mocked during testing. os kubecontainer.OSInterface // Watcher of out of memory events. @@ -796,11 +813,14 @@ func (kl *Kubelet) validateNodeIP() error { return fmt.Errorf("Node IP: %q not found in the host's network interfaces", kl.nodeIP.String()) } +// allSourcesReady returns whether all seen pod sources are ready. func (kl *Kubelet) allSourcesReady() bool { // Make a copy of the sourcesSeen list because it's not thread-safe. return kl.sourcesReady(sets.NewString(kl.sourcesSeen.List()...)) } +// addSource adds a new pod source to the list of sources seen. +// TODO: reassess in the context of kubernetes/24810 func (kl *Kubelet) addSource(source string) { kl.sourcesSeen.Insert(source) } @@ -840,6 +860,8 @@ func (kl *Kubelet) GetPodDir(podUID types.UID) string { return kl.getPodDir(podUID) } +// getPodDir returns the full path to the per-pod directory for the pod with +// the given UID. func (kl *Kubelet) getPodDir(podUID types.UID) string { // Backwards compat. The "old" stuff should be removed before 1.0 // release. The thinking here is this: @@ -911,6 +933,7 @@ func (kl *Kubelet) getPodContainerDir(podUID types.UID, ctrName string) string { return newPath } +// dirExists returns true if the path exists and represents a directory. func dirExists(path string) bool { s, err := os.Stat(path) if err != nil { @@ -919,6 +942,10 @@ func dirExists(path string) bool { return s.IsDir() } +// setupDataDirs creates: +// 1. the root directory +// 2. the pods directory +// 3. the plugins directory func (kl *Kubelet) setupDataDirs() error { kl.rootDirectory = path.Clean(kl.rootDirectory) if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil { @@ -948,6 +975,7 @@ func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) { return pods, nil } +// GetNode returns the node info for the configured node name of this Kubelet. func (kl *Kubelet) GetNode() (*api.Node, error) { if kl.standaloneMode { return kl.initialNodeStatus() @@ -1048,6 +1076,8 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { kl.syncLoop(updates, kl) } +// initialNodeStatus determines the initial node status, incorporating node +// labels and information from the cloud provider. func (kl *Kubelet) initialNodeStatus() (*api.Node, error) { node := &api.Node{ ObjectMeta: api.ObjectMeta{ @@ -1237,6 +1267,7 @@ func (kl *Kubelet) relabelVolumes(pod *api.Pod, volumes kubecontainer.VolumeMap) return nil } +// makeMounts determines the mount points for the given container. func makeMounts(pod *api.Pod, podDir string, container *api.Container, hostName, hostDomain, podIP string, podVolumes kubecontainer.VolumeMap) ([]kubecontainer.Mount, error) { // Kubernetes only mounts on /etc/hosts if : // - container does not use hostNetwork and @@ -1280,6 +1311,8 @@ func makeMounts(pod *api.Pod, podDir string, container *api.Container, hostName, return mounts, nil } +// makeHostsMount makes the mountpoint for the hosts file that the containers +// in a pod are injected with. func makeHostsMount(podDir, podIP, hostName, hostDomainName string) (*kubecontainer.Mount, error) { hostsFilePath := path.Join(podDir, "etc-hosts") if err := ensureHostsFile(hostsFilePath, podIP, hostName, hostDomainName); err != nil { @@ -1293,6 +1326,8 @@ func makeHostsMount(podDir, podIP, hostName, hostDomainName string) (*kubecontai }, nil } +// ensureHostsFile ensures that the given host file has an up-to-date ip, host +// name, and domain name. func ensureHostsFile(fileName, hostIP, hostName, hostDomainName string) error { if _, err := os.Stat(fileName); os.IsExist(err) { glog.V(4).Infof("kubernetes-managed etc-hosts file exits. Will not be recreated: %q", fileName) @@ -1475,7 +1510,7 @@ func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) { return m, nil } -// Make the service environment variables for a pod in the given namespace. +// Make the environment variables for a pod in the given namespace. func (kl *Kubelet) makeEnvironmentVariables(pod *api.Pod, container *api.Container, podIP string) ([]kubecontainer.EnvVar, error) { var result []kubecontainer.EnvVar // Note: These are added to the docker.Config, but are not included in the checksum computed @@ -1570,6 +1605,8 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *api.Pod, container *api.Contain return result, nil } +// podFieldSelectorRuntimeValue returns the runtime value of the given +// selector for a pod. func (kl *Kubelet) podFieldSelectorRuntimeValue(fs *api.ObjectFieldSelector, pod *api.Pod, podIP string) (string, error) { internalFieldPath, _, err := api.Scheme.ConvertFieldLabel(fs.APIVersion, "Pod", fs.FieldPath, "") if err != nil { @@ -1656,6 +1693,10 @@ type dnsScrubber interface { ScrubDNS(nameservers, searches []string) (nsOut, srchOut []string) } +// parseResolveConf reads a resolv.conf file from the given reader, and parses +// it into nameservers and searches, possibly returning an error. The given +// dnsScrubber allows cloud providers to post-process dns names. +// TODO: move to utility package func parseResolvConf(reader io.Reader, dnsScrubber dnsScrubber) (nameservers []string, searches []string, err error) { file, err := ioutil.ReadAll(reader) if err != nil { @@ -1705,8 +1746,6 @@ func (kl *Kubelet) killPod(pod *api.Pod, runningPod *kubecontainer.Pod, status * return kl.containerRuntime.KillPod(pod, p) } -type empty struct{} - // makePodDataDirs creates the dirs for the pod datas. func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error { uid := pod.UID @@ -1722,12 +1761,42 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error { return nil } +// syncPod is the transaction script for the sync of a single pod. +// +// Arguments: +// +// pod - the pod to sync +// mirrorPod - the mirror pod for the pod to sync, if it is a static pod +// podStatus - the current status (TODO: always from the status manager?) +// updateType - the type of update (ADD, UPDATE, REMOVE, RECONCILE) +// +// The workflow is: +// * If the pod is being created, record pod worker start latency +// * Call generateAPIPodStatus to prepare an api.PodStatus for the pod +// * If the pod is being seen as running for the first time, record pod +// start latency +// * Update the status of the pod in the status manager +// * Kill the pod if it should not be running +// * Create a mirror pod if the pod is a static pod, and does not +// already have a mirror pod +// * Create the data directories for the pod if they do not exist +// * Mount volumes and update the volume manager +// * Fetch the pull secrets for the pod +// * Call the container runtime's SyncPod callback +// * Update the traffic shaping for the pod's ingress and egress limits +// +// If any step if this workflow errors, the error is returned, and is repeated +// on the next syncPod call. func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error { + // Latency measurements for the main workflow are relative to the + // (first time the pod was seen by the API server. var firstSeenTime time.Time if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok { firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get() } + // Record pod worker start latency if being created + // TODO: make pod workers record their own latencies if updateType == kubetypes.SyncPodCreate { if !firstSeenTime.IsZero() { // This is the first time we are syncing the pod. Record the latency @@ -1738,16 +1807,19 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecont } } + // Generate final API pod status with pod and status manager status apiPodStatus := kl.generateAPIPodStatus(pod, podStatus) + // Record the time it takes for the pod to become running. existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID) if !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning && !firstSeenTime.IsZero() { metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) } + // Update status in the status manager kl.statusManager.SetPodStatus(pod, apiPodStatus) - // Kill pods we can't run. + // Kill pod if it should not be running if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil { if err := kl.killPod(pod, nil, podStatus); err != nil { utilruntime.HandleError(err) @@ -1779,12 +1851,13 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecont } } + // Make data directories for the pod if err := kl.makePodDataDirs(pod); err != nil { glog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err) return err } - // Mount volumes. + // Mount volumes and update the volume manager podVolumes, err := kl.mountExternalVolumes(pod) if err != nil { ref, errGetRef := api.GetReference(pod) @@ -1796,21 +1869,26 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecont } kl.volumeManager.SetVolumes(pod.UID, podVolumes) + // Fetch the pull secrets for the pod pullSecrets, err := kl.getPullSecretsForPod(pod) if err != nil { glog.Errorf("Unable to get pull secrets for pod %q: %v", format.Pod(pod), err) return err } + // Call the container runtime's SyncPod callback result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff) kl.reasonCache.Update(pod.UID, result) if err = result.Error(); err != nil { return err } + // early successful exit if pod is not bandwidth-constrained if !kl.shapingEnabled() { return nil } + + // Update the traffic shaping for the pod's ingress and egress limits ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations) if err != nil { return err @@ -1830,6 +1908,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecont return nil } +// returns whether the pod uses the host network namespace. func podUsesHostNetwork(pod *api.Pod) bool { return pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork } @@ -1852,9 +1931,9 @@ func (kl *Kubelet) getPullSecretsForPod(pod *api.Pod) ([]api.Secret, error) { return pullSecrets, nil } -// Return name of a volume. When the volume is a PersistentVolumeClaim, -// it returns name of the real PersistentVolume bound to the claim. -// It returns errror when the clam is not bound yet. +// resolveVolumeName returns the name of the persistent volume (PV) claimed by +// a persistent volume claim (PVC) or an error if the claim is not bound. +// Returns nil if the volume does not use a PVC. func (kl *Kubelet) resolveVolumeName(pod *api.Pod, volume *api.Volume) (string, error) { claimSource := volume.VolumeSource.PersistentVolumeClaim if claimSource != nil { @@ -1893,8 +1972,8 @@ func (kl *Kubelet) getDesiredVolumes(pods []*api.Pod) map[string]api.Volume { return desiredVolumes } -// cleanupOrphanedPodDirs removes a pod directory if the pod is not in the -// desired set of pods and there is no running containers in the pod. +// cleanupOrphanedPodDirs removes the volumes of pods that should not be +// running and that have no containers running. func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*api.Pod, runningPods []*kubecontainer.Pod) error { active := sets.NewString() for _, pod := range pods { @@ -1926,6 +2005,8 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*api.Pod, runningPods []*kubeco return utilerrors.NewAggregate(errlist) } +// cleanupBandwidthLimits updates the status of bandwidth-limited containers +// and ensures that only the the appropriate CIDRs are active on the node. func (kl *Kubelet) cleanupBandwidthLimits(allPods []*api.Pod) error { if kl.shaper == nil { return nil @@ -2082,6 +2163,8 @@ func (kl *Kubelet) podIsTerminated(pod *api.Pod) bool { return false } +// filterOutTerminatedPods returns the given pods which the status manager +// does not consider failed or succeeded. func (kl *Kubelet) filterOutTerminatedPods(pods []*api.Pod) []*api.Pod { var filteredPods []*api.Pod for _, p := range pods { @@ -2106,6 +2189,12 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods []*api. kl.statusManager.RemoveOrphanedStatuses(podUIDs) } +// deletePod deletes the pod from the internal state of the kubelet by: +// 1. stopping the associated pod worker asynchronously +// 2. signaling to kill the pod by sending on the podKillingCh channel +// +// deletePod returns an error if not all sources are ready or the pod is not +// found in the runtime cache. func (kl *Kubelet) deletePod(pod *api.Pod) error { if pod == nil { return fmt.Errorf("deletePod does not allow nil pod") @@ -2136,6 +2225,9 @@ func (kl *Kubelet) deletePod(pod *api.Pod) error { return nil } +// empty is a placeholder type used to implement a set +type empty struct{} + // HandlePodCleanups performs a series of cleanup work, including terminating // pod workers, killing unwanted pods, and removing orphaned volumes/pod // directories. @@ -2255,6 +2347,9 @@ func (kl *Kubelet) podKiller() { } } +// podsByCreationTime makes an array of pods sortable by their creation +// timestamps. +// TODO: move into util package type podsByCreationTime []*api.Pod func (s podsByCreationTime) Len() int { @@ -2313,6 +2408,8 @@ func (kl *Kubelet) matchesNodeSelector(pod *api.Pod) bool { return predicates.PodMatchesNodeLabels(pod, node) } +// rejectPod records an event about the pod with the given reason and message, +// and updates the pod to the failed phase in the status manage. func (kl *Kubelet) rejectPod(pod *api.Pod, reason, message string) { kl.recorder.Eventf(pod, api.EventTypeWarning, reason, message) kl.statusManager.SetPodStatus(pod, api.PodStatus{ @@ -2406,11 +2503,45 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand } } -func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler SyncHandler, +// syncLoopIteration reads from various channels and dispatches pods to the +// given handler. +// +// Arguments: +// 1. configCh: a channel to read config events from +// 2. handler: the SyncHandler to dispatch pods to +// 3. syncCh: a channel to read periodic sync events from +// 4. houseKeepingCh: a channel to read housekeeping events from +// 5. plegCh: a channel to read PLEG updates from +// +// Events are also read from the kubelet liveness manager's update channel. +// +// The workflow is to read from one of the channels, handle that event, and +// update the timestamp in the sync loop monitor. +// +// Here is an appropriate place to note that despite the syntactical +// similarity to the switch statement, the case statements in a select are +// evaluated in a pseudorandom order if there are multiple channels ready to +// read from when the select is evaluated. In other words, case statements +// are evaluated in random order, and you can not assume that the case +// statements evaluate in order if multiple channels have events. +// +// With that in mind, in truly no particular order, the different channels +// are handled as follows: +// +// * configCh: dispatch the pods for the config change to the appropriate +// handler callback for the event type +// * plegCh: update the runtime cache; sync pod +// * syncCh: sync all pods waiting for sync +// * houseKeepingCh: trigger cleanup of pods +// * liveness manager: sync pods that have failed or in which one or more +// containers have failed liveness checks +func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { kl.syncLoopMonitor.Store(kl.clock.Now()) select { - case u, open := <-updates: + case u, open := <-configCh: + // Update from a config source; dispatch it to the right handler + // callback. if !open { glog.Errorf("Update channel is closed. Exiting the sync loop.") return false @@ -2439,6 +2570,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler glog.Errorf("Kubelet does not support snapshot update") } case e := <-plegCh: + // PLEG event for a pod; sync it. pod, ok := kl.podManager.GetPodByUID(e.ID) if !ok { // If the pod no longer exists, ignore the event. @@ -2454,6 +2586,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler } handler.HandlePodSyncs([]*api.Pod{pod}) case <-syncCh: + // Sync pods waiting for sync podsToSync := kl.getPodsToSync() if len(podsToSync) == 0 { break @@ -2461,8 +2594,9 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler glog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync)) kl.HandlePodSyncs(podsToSync) case update := <-kl.livenessManager.Updates(): - // We only care about failures (signalling container death) here. if update.Result == proberesults.Failure { + // The liveness manager detected a failure; sync the pod. + // We should not use the pod from livenessManager, because it is never updated after // initialization. pod, ok := kl.podManager.GetPodByUID(update.PodUID) @@ -2475,6 +2609,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler handler.HandlePodSyncs([]*api.Pod{pod}) } case <-housekeepingCh: + // It's time to do housekeeping if !kl.allSourcesReady() { // If the sources aren't ready, skip housekeeping, as we may // accidentally delete pods from unready sources. @@ -2490,10 +2625,12 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler return true } +// dispatchWork starts the asynchronous sync of the pod in a pod worker. +// If the pod is terminated, dispatchWork func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) { if kl.podIsTerminated(pod) { if pod.DeletionTimestamp != nil { - // If the pod is in a termianted state, there is no pod worker to + // If the pod is in a terminated state, there is no pod worker to // handle the work item. Check if the DeletionTimestamp has been // set, and force a status update to trigger a pod deletion request // to the apiserver. @@ -2521,6 +2658,8 @@ func (kl *Kubelet) handleMirrorPod(mirrorPod *api.Pod, start time.Time) { } } +// HandlePodAdditions is the callback in SyncHandler for pods being added from +// a config source. func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) { start := kl.clock.Now() sort.Sort(podsByCreationTime(pods)) @@ -2547,6 +2686,8 @@ func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) { } } +// HandlePodUpdates is the callback in the SyncHandler interface for pods +// being updated from a config source. func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) { start := kl.clock.Now() for _, pod := range pods { @@ -2562,6 +2703,8 @@ func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) { } } +// HandlePodDeletions is the callback in the SyncHandler interface for pods +// being deleted from a config source. func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) { start := kl.clock.Now() for _, pod := range pods { @@ -2579,6 +2722,8 @@ func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) { } } +// HandlePodReconcile is the callback in the SyncHandler interface for pods +// that should be reconciled. func (kl *Kubelet) HandlePodReconcile(pods []*api.Pod) { for _, pod := range pods { // Update the pod in pod manager, status manager will do periodically reconcile according @@ -2587,6 +2732,8 @@ func (kl *Kubelet) HandlePodReconcile(pods []*api.Pod) { } } +// HandlePodSyncs is the callback in the syncHandler interface for pods +// that should be dispatched to pod workers for sync. func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) { start := kl.clock.Now() for _, pod := range pods { @@ -2595,6 +2742,7 @@ func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) { } } +// LatestLoopEntryTime returns the last time in the sync loop monitor. func (kl *Kubelet) LatestLoopEntryTime() time.Time { val := kl.syncLoopMonitor.Load() if val == nil { @@ -2603,6 +2751,7 @@ func (kl *Kubelet) LatestLoopEntryTime() time.Time { return val.(time.Time) } +// PLEGHealthCheck returns whether the PLEG is healty. func (kl *Kubelet) PLEGHealthCheck() (bool, error) { return kl.pleg.Healthy() } @@ -2729,6 +2878,8 @@ func (kl *Kubelet) GetRunningPods() ([]*api.Pod, error) { return apiPods, nil } +// GetPodByFullName gets the pod with the given 'full' name, which +// incorporates the namespace as well as whether the pod was found. func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.Pod, bool) { return kl.podManager.GetPodByFullName(podFullName) } @@ -2739,6 +2890,10 @@ func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) { return kl.podManager.GetPodByName(namespace, name) } +// updateRuntimeUp calls the container runtime status callback, initializing +// the runtime dependent modules when the container runtime first comes up, +// and returns an error if the status check fails. If the status check is OK, +// update the container runtime uptime in the kubelet runtimeState. func (kl *Kubelet) updateRuntimeUp() { if err := kl.containerRuntime.Status(); err != nil { glog.Errorf("Container runtime sanity check failed: %v", err) @@ -2787,6 +2942,8 @@ func (kl *Kubelet) updateNodeStatus() error { return fmt.Errorf("update node status exceeds retry count") } +// recordNodeStatusEvent records an event of the given type with the given +// message for the node. func (kl *Kubelet) recordNodeStatusEvent(eventtype, event string) { glog.V(2).Infof("Recording %s event message for node %s", event, kl.nodeName) // TODO: This requires a transaction, either both node status is updated @@ -2794,6 +2951,11 @@ func (kl *Kubelet) recordNodeStatusEvent(eventtype, event string) { kl.recorder.Eventf(kl.nodeRef, eventtype, event, "Node %s status is now: %s", kl.nodeName, event) } +// syncNetworkStatus updates the network state, ensuring that the network is +// configured correctly if the kubelet is set to configure cbr0: +// * handshake flannel helper if the flannel experimental overlay is being used. +// * ensure that iptables masq rules are setup +// * reconcile cbr0 with the pod CIDR func (kl *Kubelet) syncNetworkStatus() { var err error if kl.configureCBR0 { @@ -3275,6 +3437,8 @@ func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase { } } +// generateAPIPodStatus creates the final API pod status for a pod, given the +// internal pod status. func (kl *Kubelet) generateAPIPodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus { glog.V(3).Infof("Generating status for %q", format.Pod(pod)) // TODO: Consider include the container information. @@ -3310,6 +3474,9 @@ func (kl *Kubelet) generateAPIPodStatus(pod *api.Pod, podStatus *kubecontainer.P return *s } +// convertStatusToAPIStatus creates an api PodStatus for the given pod from +// the given internal pod status. It is purely transformative and does not +// alter the kubelet state at all. func (kl *Kubelet) convertStatusToAPIStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) *api.PodStatus { var apiPodStatus api.PodStatus uid := pod.UID @@ -3474,6 +3641,8 @@ func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, contain return kl.runner.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty) } +// AttachContainer uses the container runtime to attach the given streams to +// the given container. func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { podUID = kl.podManager.TranslatePodUID(podUID) @@ -3509,10 +3678,12 @@ func (kl *Kubelet) BirthCry() { kl.recorder.Eventf(kl.nodeRef, api.EventTypeNormal, kubecontainer.StartingKubelet, "Starting kubelet.") } +// StreamingConnectionIdleTimeout returns the timeout for streaming connections to the HTTP server. func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration { return kl.streamingConnectionIdleTimeout } +// ResyncInterval returns the interval used for periodic syncs. func (kl *Kubelet) ResyncInterval() time.Duration { return kl.resyncInterval } @@ -3544,10 +3715,13 @@ func (kl *Kubelet) GetContainerInfoV2(name string, options cadvisorapiv2.Request return kl.cadvisor.ContainerInfoV2(name, options) } +// DockerImagesFsInfo returns information about docker image fs usage from +// cadvisor. func (kl *Kubelet) DockerImagesFsInfo() (cadvisorapiv2.FsInfo, error) { return kl.cadvisor.DockerImagesFsInfo() } +// RootFsInfo returns info about the root fs from cadvisor. func (kl *Kubelet) RootFsInfo() (cadvisorapiv2.FsInfo, error) { return kl.cadvisor.RootFsInfo() } @@ -3579,10 +3753,12 @@ func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) { return kl.machineInfo, nil } +// ListenAndServe runs the kubelet HTTP server. func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool) { server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, kl.containerRuntime) } +// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode. func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) { server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, kl.containerRuntime) } @@ -3593,6 +3769,8 @@ func (kl *Kubelet) GetRuntime() kubecontainer.Runtime { return kl.containerRuntime } +// updatePodCIDR updates the pod CIDR in the runtime state if it is different +// from the current CIDR. func (kl *Kubelet) updatePodCIDR(cidr string) { if kl.runtimeState.podCIDR() == cidr { return @@ -3608,6 +3786,7 @@ func (kl *Kubelet) updatePodCIDR(cidr string) { } } +// shapingEnabled returns whether traffic shaping is enabled. func (kl *Kubelet) shapingEnabled() bool { // Disable shaping if a network plugin is defined and supports shaping if kl.networkPlugin != nil && kl.networkPlugin.Capabilities().Has(network.NET_PLUGIN_CAPABILITY_SHAPING) { @@ -3616,6 +3795,7 @@ func (kl *Kubelet) shapingEnabled() bool { return true } +// GetNodeConfig returns the container manager node config. func (kl *Kubelet) GetNodeConfig() cm.NodeConfig { return kl.containerManager.GetNodeConfig() }