mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 03:57:41 +00:00
kubelet: Remove dispatchWork and inline calls to UpdatePod
The HandlePod* methods are all structurally similar, but accrued subtle differences. In general the only point for Handle is to process admission and to update the pod worker with the desired state of the kubelet's config (so that pod worker can make it the actual state). Add a new GetPodAndMirrorPod() method that handles when the config pod is ambiguous (pod or mirror pod) and inline the structure. Add comments on questionable additions in the config methods for future improvement. Move the metric observation of container count closer to where pods are actually started (in the pod worker). A future change can likely move it to syncPod.
This commit is contained in:
parent
e7207c8546
commit
80b1aca580
@ -2442,32 +2442,6 @@ func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandle
|
||||
handler.HandlePodSyncs([]*v1.Pod{pod})
|
||||
}
|
||||
|
||||
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
|
||||
// If the pod has completed termination, dispatchWork will perform no action.
|
||||
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
|
||||
// Run the sync in an async worker.
|
||||
kl.podWorkers.UpdatePod(UpdatePodOptions{
|
||||
Pod: pod,
|
||||
MirrorPod: mirrorPod,
|
||||
UpdateType: syncType,
|
||||
StartTime: start,
|
||||
})
|
||||
// Note the number of containers for new pods.
|
||||
if syncType == kubetypes.SyncPodCreate {
|
||||
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: handle mirror pods in a separate component (issue #17251)
|
||||
func (kl *Kubelet) handleMirrorPod(mirrorPod *v1.Pod, start time.Time) {
|
||||
// Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the
|
||||
// corresponding static pod. Send update to the pod worker if the static
|
||||
// pod exists.
|
||||
if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok {
|
||||
kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
|
||||
}
|
||||
}
|
||||
|
||||
// HandlePodAdditions is the callback in SyncHandler for pods being added from
|
||||
// a config source.
|
||||
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
|
||||
@ -2485,8 +2459,18 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
|
||||
// the apiserver and no action (other than cleanup) is required.
|
||||
kl.podManager.AddPod(pod)
|
||||
|
||||
if kubetypes.IsMirrorPod(pod) {
|
||||
kl.handleMirrorPod(pod, start)
|
||||
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
|
||||
if wasMirror {
|
||||
if pod == nil {
|
||||
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
|
||||
continue
|
||||
}
|
||||
kl.podWorkers.UpdatePod(UpdatePodOptions{
|
||||
Pod: pod,
|
||||
MirrorPod: mirrorPod,
|
||||
UpdateType: kubetypes.SyncPodUpdate,
|
||||
StartTime: start,
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
@ -2530,8 +2514,12 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
|
||||
}
|
||||
}
|
||||
}
|
||||
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
|
||||
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
|
||||
kl.podWorkers.UpdatePod(UpdatePodOptions{
|
||||
Pod: pod,
|
||||
MirrorPod: mirrorPod,
|
||||
UpdateType: kubetypes.SyncPodCreate,
|
||||
StartTime: start,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -2541,12 +2529,21 @@ func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
|
||||
start := kl.clock.Now()
|
||||
for _, pod := range pods {
|
||||
kl.podManager.UpdatePod(pod)
|
||||
if kubetypes.IsMirrorPod(pod) {
|
||||
kl.handleMirrorPod(pod, start)
|
||||
continue
|
||||
|
||||
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
|
||||
if wasMirror {
|
||||
if pod == nil {
|
||||
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
|
||||
continue
|
||||
}
|
||||
}
|
||||
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
|
||||
kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
|
||||
|
||||
kl.podWorkers.UpdatePod(UpdatePodOptions{
|
||||
Pod: pod,
|
||||
MirrorPod: mirrorPod,
|
||||
UpdateType: kubetypes.SyncPodUpdate,
|
||||
StartTime: start,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -2556,10 +2553,22 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
|
||||
start := kl.clock.Now()
|
||||
for _, pod := range pods {
|
||||
kl.podManager.DeletePod(pod)
|
||||
if kubetypes.IsMirrorPod(pod) {
|
||||
kl.handleMirrorPod(pod, start)
|
||||
|
||||
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
|
||||
if wasMirror {
|
||||
if pod == nil {
|
||||
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
|
||||
continue
|
||||
}
|
||||
kl.podWorkers.UpdatePod(UpdatePodOptions{
|
||||
Pod: pod,
|
||||
MirrorPod: mirrorPod,
|
||||
UpdateType: kubetypes.SyncPodUpdate,
|
||||
StartTime: start,
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
// Deletion is allowed to fail because the periodic cleanup routine
|
||||
// will trigger deletion again.
|
||||
if err := kl.deletePod(pod); err != nil {
|
||||
@ -2569,7 +2578,8 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
|
||||
}
|
||||
|
||||
// HandlePodReconcile is the callback in the SyncHandler interface for pods
|
||||
// that should be reconciled.
|
||||
// that should be reconciled. Pods are reconciled when only the status of the
|
||||
// pod is updated in the API.
|
||||
func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
|
||||
start := kl.clock.Now()
|
||||
for _, pod := range pods {
|
||||
@ -2577,13 +2587,37 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
|
||||
// to the pod manager.
|
||||
kl.podManager.UpdatePod(pod)
|
||||
|
||||
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
|
||||
if wasMirror {
|
||||
if pod == nil {
|
||||
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
|
||||
continue
|
||||
}
|
||||
// Static pods should be reconciled the same way as regular pods
|
||||
}
|
||||
|
||||
// TODO: reconcile being calculated in the config manager is questionable, and avoiding
|
||||
// extra syncs may no longer be necessary. Reevaluate whether Reconcile and Sync can be
|
||||
// merged (after resolving the next two TODOs).
|
||||
|
||||
// Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
|
||||
// TODO: this should be unnecessary today - determine what is the cause for this to
|
||||
// be different than Sync, or if there is a better place for it. For instance, we have
|
||||
// needsReconcile in kubelet/config, here, and in status_manager.
|
||||
if status.NeedToReconcilePodReadiness(pod) {
|
||||
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
|
||||
kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
|
||||
kl.podWorkers.UpdatePod(UpdatePodOptions{
|
||||
Pod: pod,
|
||||
MirrorPod: mirrorPod,
|
||||
UpdateType: kubetypes.SyncPodSync,
|
||||
StartTime: start,
|
||||
})
|
||||
}
|
||||
|
||||
// After an evicted pod is synced, all dead containers in the pod can be removed.
|
||||
// TODO: this is questionable - status read is async and during eviction we already
|
||||
// expect to not have some container info. The pod worker knows whether a pod has
|
||||
// been evicted, so if this is about minimizing the time to react to an eviction we
|
||||
// can do better. If it's about preserving pod status info we can also do better.
|
||||
if eviction.PodIsEvicted(pod.Status) {
|
||||
if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
|
||||
kl.containerDeletor.deleteContainersInPod("", podStatus, true)
|
||||
@ -2597,8 +2631,24 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
|
||||
func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
|
||||
start := kl.clock.Now()
|
||||
for _, pod := range pods {
|
||||
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
|
||||
kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
|
||||
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
|
||||
if wasMirror {
|
||||
if pod == nil {
|
||||
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
|
||||
continue
|
||||
}
|
||||
// Syncing a mirror pod is a programmer error since the intent of sync is to
|
||||
// batch notify all pending work. We should make it impossible to double sync,
|
||||
// but for now log a programmer error to prevent accidental introduction.
|
||||
klog.V(3).InfoS("Programmer error, HandlePodSyncs does not expect to receive mirror pods", "podUID", pod.UID, "mirrorPodUID", mirrorPod.UID)
|
||||
continue
|
||||
}
|
||||
kl.podWorkers.UpdatePod(UpdatePodOptions{
|
||||
Pod: pod,
|
||||
MirrorPod: mirrorPod,
|
||||
UpdateType: kubetypes.SyncPodSync,
|
||||
StartTime: start,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1144,10 +1144,14 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
|
||||
|
||||
klog.V(3).InfoS("Pod will be restarted because it is in the desired set and not known to the pod workers (likely due to UID reuse)", "podUID", desiredPod.UID)
|
||||
isStatic := kubetypes.IsStaticPod(desiredPod)
|
||||
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(desiredPod)
|
||||
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(desiredPod)
|
||||
if pod == nil || wasMirror {
|
||||
klog.V(2).InfoS("Programmer error, restartable pod was a mirror pod but activePods should never contain a mirror pod", "podUID", desiredPod.UID)
|
||||
continue
|
||||
}
|
||||
kl.podWorkers.UpdatePod(UpdatePodOptions{
|
||||
UpdateType: kubetypes.SyncPodCreate,
|
||||
Pod: desiredPod,
|
||||
Pod: pod,
|
||||
MirrorPod: mirrorPod,
|
||||
})
|
||||
|
||||
@ -1234,7 +1238,6 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
|
||||
|
||||
// Cleanup any backoff entries.
|
||||
kl.backOff.GC()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1340,15 +1343,26 @@ func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, con
|
||||
return fmt.Errorf("pod %q cannot be found - no logs available", name)
|
||||
}
|
||||
|
||||
podUID := pod.UID
|
||||
if mirrorPod, ok := kl.podManager.GetMirrorPodByPod(pod); ok {
|
||||
// TODO: this should be using the podWorker's pod store as authoritative, since
|
||||
// the mirrorPod might still exist, the pod may have been force deleted but
|
||||
// is still terminating (users should be able to view logs of force deleted static pods
|
||||
// based on full name).
|
||||
var podUID types.UID
|
||||
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
|
||||
if wasMirror {
|
||||
if pod == nil {
|
||||
return fmt.Errorf("mirror pod %q does not have a corresponding pod", name)
|
||||
}
|
||||
podUID = mirrorPod.UID
|
||||
} else {
|
||||
podUID = pod.UID
|
||||
}
|
||||
|
||||
podStatus, found := kl.statusManager.GetPodStatus(podUID)
|
||||
if !found {
|
||||
// If there is no cached status, use the status from the
|
||||
// apiserver. This is useful if kubelet has recently been
|
||||
// restarted.
|
||||
// config source (apiserver). This is useful if kubelet
|
||||
// has recently been restarted.
|
||||
podStatus = pod.Status
|
||||
}
|
||||
|
||||
|
@ -597,7 +597,11 @@ func TestDispatchWorkOfCompletedPod(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for _, pod := range pods {
|
||||
kubelet.dispatchWork(pod, kubetypes.SyncPodSync, nil, time.Now())
|
||||
kubelet.podWorkers.UpdatePod(UpdatePodOptions{
|
||||
Pod: pod,
|
||||
UpdateType: kubetypes.SyncPodSync,
|
||||
StartTime: time.Now(),
|
||||
})
|
||||
if !got {
|
||||
t.Errorf("Should not skip completed pod %q", pod.Name)
|
||||
}
|
||||
@ -651,7 +655,11 @@ func TestDispatchWorkOfActivePod(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, pod := range pods {
|
||||
kubelet.dispatchWork(pod, kubetypes.SyncPodSync, nil, time.Now())
|
||||
kubelet.podWorkers.UpdatePod(UpdatePodOptions{
|
||||
Pod: pod,
|
||||
UpdateType: kubetypes.SyncPodSync,
|
||||
StartTime: time.Now(),
|
||||
})
|
||||
if !got {
|
||||
t.Errorf("Should not skip active pod %q", pod.Name)
|
||||
}
|
||||
|
@ -60,6 +60,10 @@ type Manager interface {
|
||||
// GetMirrorPodByPod returns the mirror pod for the given static pod and
|
||||
// whether it was known to the pod manager.
|
||||
GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool)
|
||||
// GetPodAndMirrorPod returns the complement for a pod - if a pod was provided
|
||||
// and a mirror pod can be found, return it. If a mirror pod is provided and
|
||||
// the pod can be found, return it and true for wasMirror.
|
||||
GetPodAndMirrorPod(*v1.Pod) (pod, mirrorPod *v1.Pod, wasMirror bool)
|
||||
// GetPodsAndMirrorPods returns the set of pods, the set of mirror pods, and
|
||||
// the pod fullnames of any orphaned mirror pods.
|
||||
GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string)
|
||||
@ -324,3 +328,15 @@ func (pm *basicManager) GetPodByMirrorPod(mirrorPod *v1.Pod) (*v1.Pod, bool) {
|
||||
pod, ok := pm.podByFullName[kubecontainer.GetPodFullName(mirrorPod)]
|
||||
return pod, ok
|
||||
}
|
||||
|
||||
func (pm *basicManager) GetPodAndMirrorPod(aPod *v1.Pod) (pod, mirrorPod *v1.Pod, wasMirror bool) {
|
||||
pm.lock.RLock()
|
||||
defer pm.lock.RUnlock()
|
||||
|
||||
fullName := kubecontainer.GetPodFullName(aPod)
|
||||
if kubetypes.IsMirrorPod(aPod) {
|
||||
return pm.podByFullName[fullName], aPod, true
|
||||
}
|
||||
return aPod, pm.mirrorPodByFullName[fullName], false
|
||||
|
||||
}
|
||||
|
@ -120,6 +120,22 @@ func (mr *MockManagerMockRecorder) GetMirrorPodByPod(arg0 interface{}) *gomock.C
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMirrorPodByPod", reflect.TypeOf((*MockManager)(nil).GetMirrorPodByPod), arg0)
|
||||
}
|
||||
|
||||
// GetPodAndMirrorPod mocks base method.
|
||||
func (m *MockManager) GetPodAndMirrorPod(arg0 *v1.Pod) (*v1.Pod, *v1.Pod, bool) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetPodAndMirrorPod", arg0)
|
||||
ret0, _ := ret[0].(*v1.Pod)
|
||||
ret1, _ := ret[1].(*v1.Pod)
|
||||
ret2, _ := ret[2].(bool)
|
||||
return ret0, ret1, ret2
|
||||
}
|
||||
|
||||
// GetPodAndMirrorPod indicates an expected call of GetPodAndMirrorPod.
|
||||
func (mr *MockManagerMockRecorder) GetPodAndMirrorPod(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodAndMirrorPod", reflect.TypeOf((*MockManager)(nil).GetPodAndMirrorPod), arg0)
|
||||
}
|
||||
|
||||
// GetPodByFullName mocks base method.
|
||||
func (m *MockManager) GetPodByFullName(arg0 string) (*v1.Pod, bool) {
|
||||
m.ctrl.T.Helper()
|
||||
|
@ -1181,6 +1181,12 @@ func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update
|
||||
status.startedAt = p.clock.Now()
|
||||
status.mergeLastUpdate(update.Options)
|
||||
|
||||
// If we are admitting the pod and it is new, record the count of containers
|
||||
// TODO: We should probably move this into syncPod and add an execution count
|
||||
// to the syncPod arguments, and this should be recorded on the first sync.
|
||||
// Leaving it here complicates a particularly important loop.
|
||||
metrics.ContainersPerPodCount.Observe(float64(len(update.Options.Pod.Spec.Containers)))
|
||||
|
||||
return ctx, update, true, true, true
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user