mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Typed static/mirror pod UID translation
This commit is contained in:
parent
bdf78980cc
commit
a286f25ff4
@ -26,7 +26,9 @@ import (
|
|||||||
// GetContainerInfo returns stats (from Cadvisor) for a container.
|
// GetContainerInfo returns stats (from Cadvisor) for a container.
|
||||||
func (kl *Kubelet) GetContainerInfo(podFullName string, podUID types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
|
func (kl *Kubelet) GetContainerInfo(podFullName string, podUID types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
|
||||||
|
|
||||||
podUID = kl.podManager.TranslatePodUID(podUID)
|
// Resolve and type convert back again.
|
||||||
|
// We need the static pod UID but the kubecontainer API works with types.UID.
|
||||||
|
podUID = types.UID(kl.podManager.TranslatePodUID(podUID))
|
||||||
|
|
||||||
pods, err := kl.runtimeCache.GetPods()
|
pods, err := kl.runtimeCache.GetPods()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1402,7 +1402,9 @@ func (kl *Kubelet) findContainer(podFullName string, podUID types.UID, container
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
podUID = kl.podManager.TranslatePodUID(podUID)
|
// Resolve and type convert back again.
|
||||||
|
// We need the static pod UID but the kubecontainer API works with types.UID.
|
||||||
|
podUID = types.UID(kl.podManager.TranslatePodUID(podUID))
|
||||||
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
|
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
|
||||||
return pod.FindContainerByName(containerName), nil
|
return pod.FindContainerByName(containerName), nil
|
||||||
}
|
}
|
||||||
@ -1468,7 +1470,9 @@ func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port int32,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
podUID = kl.podManager.TranslatePodUID(podUID)
|
// Resolve and type convert back again.
|
||||||
|
// We need the static pod UID but the kubecontainer API works with types.UID.
|
||||||
|
podUID = types.UID(kl.podManager.TranslatePodUID(podUID))
|
||||||
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
|
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
|
||||||
if pod.IsEmpty() {
|
if pod.IsEmpty() {
|
||||||
return fmt.Errorf("pod not found (%q)", podFullName)
|
return fmt.Errorf("pod not found (%q)", podFullName)
|
||||||
@ -1541,7 +1545,9 @@ func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
podUID = kl.podManager.TranslatePodUID(podUID)
|
// Resolve and type convert back again.
|
||||||
|
// We need the static pod UID but the kubecontainer API works with types.UID.
|
||||||
|
podUID = types.UID(kl.podManager.TranslatePodUID(podUID))
|
||||||
podFullName := kubecontainer.BuildPodFullName(podName, podNamespace)
|
podFullName := kubecontainer.BuildPodFullName(podName, podNamespace)
|
||||||
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
|
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
|
||||||
if pod.IsEmpty() {
|
if pod.IsEmpty() {
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/configmap"
|
"k8s.io/kubernetes/pkg/kubelet/configmap"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/secret"
|
"k8s.io/kubernetes/pkg/kubelet/secret"
|
||||||
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Manager stores and manages access to pods, maintaining the mappings
|
// Manager stores and manages access to pods, maintaining the mappings
|
||||||
@ -44,7 +45,7 @@ import (
|
|||||||
type Manager interface {
|
type Manager interface {
|
||||||
// GetPods returns the regular pods bound to the kubelet and their spec.
|
// GetPods returns the regular pods bound to the kubelet and their spec.
|
||||||
GetPods() []*v1.Pod
|
GetPods() []*v1.Pod
|
||||||
// GetPodByName returns the (non-mirror) pod that matches full name, as well as
|
// GetPodByFullName returns the (non-mirror) pod that matches full name, as well as
|
||||||
// whether the pod was found.
|
// whether the pod was found.
|
||||||
GetPodByFullName(podFullName string) (*v1.Pod, bool)
|
GetPodByFullName(podFullName string) (*v1.Pod, bool)
|
||||||
// GetPodByName provides the (non-mirror) pod that matches namespace and
|
// GetPodByName provides the (non-mirror) pod that matches namespace and
|
||||||
@ -83,10 +84,10 @@ type Manager interface {
|
|||||||
// All public-facing functions should perform this translation for UIDs
|
// All public-facing functions should perform this translation for UIDs
|
||||||
// because user may provide a mirror pod UID, which is not recognized by
|
// because user may provide a mirror pod UID, which is not recognized by
|
||||||
// internal Kubelet functions.
|
// internal Kubelet functions.
|
||||||
TranslatePodUID(uid types.UID) types.UID
|
TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID
|
||||||
// GetUIDTranslations returns the mappings of static pod UIDs to mirror pod
|
// GetUIDTranslations returns the mappings of static pod UIDs to mirror pod
|
||||||
// UIDs and mirror pod UIDs to static pod UIDs.
|
// UIDs and mirror pod UIDs to static pod UIDs.
|
||||||
GetUIDTranslations() (podToMirror, mirrorToPod map[types.UID]types.UID)
|
GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID)
|
||||||
// IsMirrorPodOf returns true if mirrorPod is a correct representation of
|
// IsMirrorPodOf returns true if mirrorPod is a correct representation of
|
||||||
// pod; false otherwise.
|
// pod; false otherwise.
|
||||||
IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool
|
IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool
|
||||||
@ -94,7 +95,7 @@ type Manager interface {
|
|||||||
MirrorClient
|
MirrorClient
|
||||||
}
|
}
|
||||||
|
|
||||||
// basicManager is a functional Manger.
|
// basicManager is a functional Manager.
|
||||||
//
|
//
|
||||||
// All fields in basicManager are read-only and are updated calling SetPods,
|
// All fields in basicManager are read-only and are updated calling SetPods,
|
||||||
// AddPod, UpdatePod, or DeletePod.
|
// AddPod, UpdatePod, or DeletePod.
|
||||||
@ -103,16 +104,16 @@ type basicManager struct {
|
|||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
|
|
||||||
// Regular pods indexed by UID.
|
// Regular pods indexed by UID.
|
||||||
podByUID map[types.UID]*v1.Pod
|
podByUID map[kubetypes.ResolvedPodUID]*v1.Pod
|
||||||
// Mirror pods indexed by UID.
|
// Mirror pods indexed by UID.
|
||||||
mirrorPodByUID map[types.UID]*v1.Pod
|
mirrorPodByUID map[kubetypes.MirrorPodUID]*v1.Pod
|
||||||
|
|
||||||
// Pods indexed by full name for easy access.
|
// Pods indexed by full name for easy access.
|
||||||
podByFullName map[string]*v1.Pod
|
podByFullName map[string]*v1.Pod
|
||||||
mirrorPodByFullName map[string]*v1.Pod
|
mirrorPodByFullName map[string]*v1.Pod
|
||||||
|
|
||||||
// Mirror pod UID to pod UID map.
|
// Mirror pod UID to pod UID map.
|
||||||
translationByUID map[types.UID]types.UID
|
translationByUID map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID
|
||||||
|
|
||||||
// basicManager is keeping secretManager and configMapManager up-to-date.
|
// basicManager is keeping secretManager and configMapManager up-to-date.
|
||||||
secretManager secret.Manager
|
secretManager secret.Manager
|
||||||
@ -137,11 +138,11 @@ func (pm *basicManager) SetPods(newPods []*v1.Pod) {
|
|||||||
pm.lock.Lock()
|
pm.lock.Lock()
|
||||||
defer pm.lock.Unlock()
|
defer pm.lock.Unlock()
|
||||||
|
|
||||||
pm.podByUID = make(map[types.UID]*v1.Pod)
|
pm.podByUID = make(map[kubetypes.ResolvedPodUID]*v1.Pod)
|
||||||
pm.podByFullName = make(map[string]*v1.Pod)
|
pm.podByFullName = make(map[string]*v1.Pod)
|
||||||
pm.mirrorPodByUID = make(map[types.UID]*v1.Pod)
|
pm.mirrorPodByUID = make(map[kubetypes.MirrorPodUID]*v1.Pod)
|
||||||
pm.mirrorPodByFullName = make(map[string]*v1.Pod)
|
pm.mirrorPodByFullName = make(map[string]*v1.Pod)
|
||||||
pm.translationByUID = make(map[types.UID]types.UID)
|
pm.translationByUID = make(map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID)
|
||||||
|
|
||||||
pm.updatePodsInternal(newPods...)
|
pm.updatePodsInternal(newPods...)
|
||||||
}
|
}
|
||||||
@ -157,7 +158,7 @@ func (pm *basicManager) UpdatePod(pod *v1.Pod) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// updatePodsInternal replaces the given pods in the current state of the
|
// updatePodsInternal replaces the given pods in the current state of the
|
||||||
// manager, updating the various indices. The caller is assumed to hold the
|
// manager, updating the various indices. The caller is assumed to hold the
|
||||||
// lock.
|
// lock.
|
||||||
func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) {
|
func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) {
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
@ -172,17 +173,21 @@ func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) {
|
|||||||
pm.configMapManager.RegisterPod(pod)
|
pm.configMapManager.RegisterPod(pod)
|
||||||
}
|
}
|
||||||
podFullName := kubecontainer.GetPodFullName(pod)
|
podFullName := kubecontainer.GetPodFullName(pod)
|
||||||
|
// This logic relies on a static pod and its mirror to have the same name.
|
||||||
|
// It is safe to type convert here due to the IsMirrorPod guard.
|
||||||
if IsMirrorPod(pod) {
|
if IsMirrorPod(pod) {
|
||||||
pm.mirrorPodByUID[pod.UID] = pod
|
mirrorPodUID := kubetypes.MirrorPodUID(pod.UID)
|
||||||
|
pm.mirrorPodByUID[mirrorPodUID] = pod
|
||||||
pm.mirrorPodByFullName[podFullName] = pod
|
pm.mirrorPodByFullName[podFullName] = pod
|
||||||
if p, ok := pm.podByFullName[podFullName]; ok {
|
if p, ok := pm.podByFullName[podFullName]; ok {
|
||||||
pm.translationByUID[pod.UID] = p.UID
|
pm.translationByUID[mirrorPodUID] = kubetypes.ResolvedPodUID(p.UID)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pm.podByUID[pod.UID] = pod
|
resolvedPodUID := kubetypes.ResolvedPodUID(pod.UID)
|
||||||
|
pm.podByUID[resolvedPodUID] = pod
|
||||||
pm.podByFullName[podFullName] = pod
|
pm.podByFullName[podFullName] = pod
|
||||||
if mirror, ok := pm.mirrorPodByFullName[podFullName]; ok {
|
if mirror, ok := pm.mirrorPodByFullName[podFullName]; ok {
|
||||||
pm.translationByUID[mirror.UID] = pod.UID
|
pm.translationByUID[kubetypes.MirrorPodUID(mirror.UID)] = resolvedPodUID
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -198,12 +203,14 @@ func (pm *basicManager) DeletePod(pod *v1.Pod) {
|
|||||||
pm.configMapManager.UnregisterPod(pod)
|
pm.configMapManager.UnregisterPod(pod)
|
||||||
}
|
}
|
||||||
podFullName := kubecontainer.GetPodFullName(pod)
|
podFullName := kubecontainer.GetPodFullName(pod)
|
||||||
|
// It is safe to type convert here due to the IsMirrorPod guard.
|
||||||
if IsMirrorPod(pod) {
|
if IsMirrorPod(pod) {
|
||||||
delete(pm.mirrorPodByUID, pod.UID)
|
mirrorPodUID := kubetypes.MirrorPodUID(pod.UID)
|
||||||
|
delete(pm.mirrorPodByUID, mirrorPodUID)
|
||||||
delete(pm.mirrorPodByFullName, podFullName)
|
delete(pm.mirrorPodByFullName, podFullName)
|
||||||
delete(pm.translationByUID, pod.UID)
|
delete(pm.translationByUID, mirrorPodUID)
|
||||||
} else {
|
} else {
|
||||||
delete(pm.podByUID, pod.UID)
|
delete(pm.podByUID, kubetypes.ResolvedPodUID(pod.UID))
|
||||||
delete(pm.podByFullName, podFullName)
|
delete(pm.podByFullName, podFullName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -218,14 +225,14 @@ func (pm *basicManager) GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod) {
|
|||||||
pm.lock.RLock()
|
pm.lock.RLock()
|
||||||
defer pm.lock.RUnlock()
|
defer pm.lock.RUnlock()
|
||||||
pods := podsMapToPods(pm.podByUID)
|
pods := podsMapToPods(pm.podByUID)
|
||||||
mirrorPods := podsMapToPods(pm.mirrorPodByUID)
|
mirrorPods := mirrorPodsMapToMirrorPods(pm.mirrorPodByUID)
|
||||||
return pods, mirrorPods
|
return pods, mirrorPods
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *basicManager) GetPodByUID(uid types.UID) (*v1.Pod, bool) {
|
func (pm *basicManager) GetPodByUID(uid types.UID) (*v1.Pod, bool) {
|
||||||
pm.lock.RLock()
|
pm.lock.RLock()
|
||||||
defer pm.lock.RUnlock()
|
defer pm.lock.RUnlock()
|
||||||
pod, ok := pm.podByUID[uid]
|
pod, ok := pm.podByUID[kubetypes.ResolvedPodUID(uid)] // Safe conversion, map only holds non-mirrors.
|
||||||
return pod, ok
|
return pod, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -241,25 +248,27 @@ func (pm *basicManager) GetPodByFullName(podFullName string) (*v1.Pod, bool) {
|
|||||||
return pod, ok
|
return pod, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *basicManager) TranslatePodUID(uid types.UID) types.UID {
|
func (pm *basicManager) TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID {
|
||||||
|
// It is safe to type convert to a resolved UID because type conversion is idempotent.
|
||||||
if uid == "" {
|
if uid == "" {
|
||||||
return uid
|
return kubetypes.ResolvedPodUID(uid)
|
||||||
}
|
}
|
||||||
|
|
||||||
pm.lock.RLock()
|
pm.lock.RLock()
|
||||||
defer pm.lock.RUnlock()
|
defer pm.lock.RUnlock()
|
||||||
if translated, ok := pm.translationByUID[uid]; ok {
|
if translated, ok := pm.translationByUID[kubetypes.MirrorPodUID(uid)]; ok {
|
||||||
return translated
|
return translated
|
||||||
}
|
}
|
||||||
return uid
|
return kubetypes.ResolvedPodUID(uid)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *basicManager) GetUIDTranslations() (podToMirror, mirrorToPod map[types.UID]types.UID) {
|
func (pm *basicManager) GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID,
|
||||||
|
mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID) {
|
||||||
pm.lock.RLock()
|
pm.lock.RLock()
|
||||||
defer pm.lock.RUnlock()
|
defer pm.lock.RUnlock()
|
||||||
|
|
||||||
podToMirror = make(map[types.UID]types.UID, len(pm.translationByUID))
|
podToMirror = make(map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, len(pm.translationByUID))
|
||||||
mirrorToPod = make(map[types.UID]types.UID, len(pm.translationByUID))
|
mirrorToPod = make(map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID, len(pm.translationByUID))
|
||||||
// Insert empty translation mapping for all static pods.
|
// Insert empty translation mapping for all static pods.
|
||||||
for uid, pod := range pm.podByUID {
|
for uid, pod := range pm.podByUID {
|
||||||
if !IsStaticPod(pod) {
|
if !IsStaticPod(pod) {
|
||||||
@ -309,7 +318,15 @@ func (pm *basicManager) IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool {
|
|||||||
return hash == getPodHash(pod)
|
return hash == getPodHash(pod)
|
||||||
}
|
}
|
||||||
|
|
||||||
func podsMapToPods(UIDMap map[types.UID]*v1.Pod) []*v1.Pod {
|
func podsMapToPods(UIDMap map[kubetypes.ResolvedPodUID]*v1.Pod) []*v1.Pod {
|
||||||
|
pods := make([]*v1.Pod, 0, len(UIDMap))
|
||||||
|
for _, pod := range UIDMap {
|
||||||
|
pods = append(pods, pod)
|
||||||
|
}
|
||||||
|
return pods
|
||||||
|
}
|
||||||
|
|
||||||
|
func mirrorPodsMapToMirrorPods(UIDMap map[kubetypes.MirrorPodUID]*v1.Pod) []*v1.Pod {
|
||||||
pods := make([]*v1.Pod, 0, len(UIDMap))
|
pods := make([]*v1.Pod, 0, len(UIDMap))
|
||||||
for _, pod := range UIDMap {
|
for _, pod := range UIDMap {
|
||||||
pods = append(pods, pod)
|
pods = append(pods, pod)
|
||||||
|
@ -97,8 +97,8 @@ func TestGetSetPods(t *testing.T) {
|
|||||||
t.Errorf("pod %q was not found in %#v", expected.UID, actualPods)
|
t.Errorf("pod %q was not found in %#v", expected.UID, actualPods)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Tests UID translation works as expected.
|
// Tests UID translation works as expected. Convert static pod UID for comparison only.
|
||||||
if uid := podManager.TranslatePodUID(mirrorPod.UID); uid != staticPod.UID {
|
if uid := podManager.TranslatePodUID(mirrorPod.UID); uid != kubetypes.ResolvedPodUID(staticPod.UID) {
|
||||||
t.Errorf("unable to translate UID %q to the static POD's UID %q; %#v",
|
t.Errorf("unable to translate UID %q to the static POD's UID %q; %#v",
|
||||||
mirrorPod.UID, staticPod.UID, podManager.mirrorPodByUID)
|
mirrorPod.UID, staticPod.UID, podManager.mirrorPodByUID)
|
||||||
}
|
}
|
||||||
|
@ -66,11 +66,11 @@ type manager struct {
|
|||||||
podStatusChannel chan podStatusSyncRequest
|
podStatusChannel chan podStatusSyncRequest
|
||||||
// Map from (mirror) pod UID to latest status version successfully sent to the API server.
|
// Map from (mirror) pod UID to latest status version successfully sent to the API server.
|
||||||
// apiStatusVersions must only be accessed from the sync thread.
|
// apiStatusVersions must only be accessed from the sync thread.
|
||||||
apiStatusVersions map[types.UID]uint64
|
apiStatusVersions map[kubetypes.MirrorPodUID]uint64
|
||||||
podDeletionSafety PodDeletionSafetyProvider
|
podDeletionSafety PodDeletionSafetyProvider
|
||||||
}
|
}
|
||||||
|
|
||||||
// PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components
|
// PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components
|
||||||
// that need to introspect status.
|
// that need to introspect status.
|
||||||
type PodStatusProvider interface {
|
type PodStatusProvider interface {
|
||||||
// GetPodStatus returns the cached status for the provided pod UID, as well as whether it
|
// GetPodStatus returns the cached status for the provided pod UID, as well as whether it
|
||||||
@ -78,7 +78,7 @@ type PodStatusProvider interface {
|
|||||||
GetPodStatus(uid types.UID) (v1.PodStatus, bool)
|
GetPodStatus(uid types.UID) (v1.PodStatus, bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// An object which provides guarantees that a pod can be saftely deleted.
|
// An object which provides guarantees that a pod can be safely deleted.
|
||||||
type PodDeletionSafetyProvider interface {
|
type PodDeletionSafetyProvider interface {
|
||||||
// A function which returns true if the pod can safely be deleted
|
// A function which returns true if the pod can safely be deleted
|
||||||
PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool
|
PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool
|
||||||
@ -116,7 +116,7 @@ func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podD
|
|||||||
podManager: podManager,
|
podManager: podManager,
|
||||||
podStatuses: make(map[types.UID]versionedPodStatus),
|
podStatuses: make(map[types.UID]versionedPodStatus),
|
||||||
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
|
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
|
||||||
apiStatusVersions: make(map[types.UID]uint64),
|
apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64),
|
||||||
podDeletionSafety: podDeletionSafety,
|
podDeletionSafety: podDeletionSafety,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -155,7 +155,7 @@ func (m *manager) Start() {
|
|||||||
func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) {
|
func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) {
|
||||||
m.podStatusesLock.RLock()
|
m.podStatusesLock.RLock()
|
||||||
defer m.podStatusesLock.RUnlock()
|
defer m.podStatusesLock.RUnlock()
|
||||||
status, ok := m.podStatuses[m.podManager.TranslatePodUID(uid)]
|
status, ok := m.podStatuses[types.UID(m.podManager.TranslatePodUID(uid))]
|
||||||
return status.status, ok
|
return status.status, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -342,7 +342,7 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
|
|||||||
default:
|
default:
|
||||||
// Let the periodic syncBatch handle the update if the channel is full.
|
// Let the periodic syncBatch handle the update if the channel is full.
|
||||||
// We can't block, since we hold the mutex lock.
|
// We can't block, since we hold the mutex lock.
|
||||||
glog.V(4).Infof("Skpping the status update for pod %q for now because the channel is full; status: %+v",
|
glog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v",
|
||||||
format.Pod(pod), status)
|
format.Pod(pod), status)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -377,7 +377,7 @@ func (m *manager) syncBatch() {
|
|||||||
|
|
||||||
// Clean up orphaned versions.
|
// Clean up orphaned versions.
|
||||||
for uid := range m.apiStatusVersions {
|
for uid := range m.apiStatusVersions {
|
||||||
_, hasPod := m.podStatuses[uid]
|
_, hasPod := m.podStatuses[types.UID(uid)]
|
||||||
_, hasMirror := mirrorToPod[uid]
|
_, hasMirror := mirrorToPod[uid]
|
||||||
if !hasPod && !hasMirror {
|
if !hasPod && !hasMirror {
|
||||||
delete(m.apiStatusVersions, uid)
|
delete(m.apiStatusVersions, uid)
|
||||||
@ -385,15 +385,15 @@ func (m *manager) syncBatch() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for uid, status := range m.podStatuses {
|
for uid, status := range m.podStatuses {
|
||||||
syncedUID := uid
|
syncedUID := kubetypes.MirrorPodUID(uid)
|
||||||
if mirrorUID, ok := podToMirror[uid]; ok {
|
if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok {
|
||||||
if mirrorUID == "" {
|
if mirrorUID == "" {
|
||||||
glog.V(5).Infof("Static pod %q (%s/%s) does not have a corresponding mirror pod; skipping", uid, status.podName, status.podNamespace)
|
glog.V(5).Infof("Static pod %q (%s/%s) does not have a corresponding mirror pod; skipping", uid, status.podName, status.podNamespace)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
syncedUID = mirrorUID
|
syncedUID = mirrorUID
|
||||||
}
|
}
|
||||||
if m.needsUpdate(syncedUID, status) {
|
if m.needsUpdate(types.UID(syncedUID), status) {
|
||||||
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
|
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
|
||||||
} else if m.needsReconcile(uid, status.status) {
|
} else if m.needsReconcile(uid, status.status) {
|
||||||
// Delete the apiStatusVersions here to force an update on the pod status
|
// Delete the apiStatusVersions here to force an update on the pod status
|
||||||
@ -433,7 +433,8 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
translatedUID := m.podManager.TranslatePodUID(pod.UID)
|
translatedUID := m.podManager.TranslatePodUID(pod.UID)
|
||||||
if len(translatedUID) > 0 && translatedUID != uid {
|
// Type convert original uid just for the purpose of comparison.
|
||||||
|
if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) {
|
||||||
glog.V(2).Infof("Pod %q was deleted and then recreated, skipping status update; old UID %q, new UID %q", format.Pod(pod), uid, translatedUID)
|
glog.V(2).Infof("Pod %q was deleted and then recreated, skipping status update; old UID %q, new UID %q", format.Pod(pod), uid, translatedUID)
|
||||||
m.deletePodStatus(uid)
|
m.deletePodStatus(uid)
|
||||||
return
|
return
|
||||||
@ -451,7 +452,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
|
|||||||
pod = newPod
|
pod = newPod
|
||||||
|
|
||||||
glog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
|
glog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
|
||||||
m.apiStatusVersions[pod.UID] = status.version
|
m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version
|
||||||
|
|
||||||
// We don't handle graceful deletion of mirror pods.
|
// We don't handle graceful deletion of mirror pods.
|
||||||
if m.canBeDeleted(pod, status.status) {
|
if m.canBeDeleted(pod, status.status) {
|
||||||
@ -469,9 +470,9 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// needsUpdate returns whether the status is stale for the given pod UID.
|
// needsUpdate returns whether the status is stale for the given pod UID.
|
||||||
// This method is not thread safe, and most only be accessed by the sync thread.
|
// This method is not thread safe, and must only be accessed by the sync thread.
|
||||||
func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool {
|
func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool {
|
||||||
latest, ok := m.apiStatusVersions[uid]
|
latest, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(uid)]
|
||||||
if !ok || latest < status.version {
|
if !ok || latest < status.version {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -401,7 +401,8 @@ func TestStaleUpdates(t *testing.T) {
|
|||||||
verifyUpdates(t, m, 0)
|
verifyUpdates(t, m, 0)
|
||||||
|
|
||||||
t.Log("... unless it's stale.")
|
t.Log("... unless it's stale.")
|
||||||
m.apiStatusVersions[pod.UID] = m.apiStatusVersions[pod.UID] - 1
|
mirrorPodUID := kubetypes.MirrorPodUID(pod.UID)
|
||||||
|
m.apiStatusVersions[mirrorPodUID] = m.apiStatusVersions[mirrorPodUID] - 1
|
||||||
|
|
||||||
m.SetPodStatus(pod, status)
|
m.SetPodStatus(pod, status)
|
||||||
m.syncBatch()
|
m.syncBatch()
|
||||||
@ -515,7 +516,7 @@ func TestStaticPod(t *testing.T) {
|
|||||||
t.Logf("Create the mirror pod")
|
t.Logf("Create the mirror pod")
|
||||||
m.podManager.AddPod(mirrorPod)
|
m.podManager.AddPod(mirrorPod)
|
||||||
assert.True(t, kubepod.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod")
|
assert.True(t, kubepod.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod")
|
||||||
assert.Equal(t, m.podManager.TranslatePodUID(mirrorPod.UID), staticPod.UID)
|
assert.Equal(t, m.podManager.TranslatePodUID(mirrorPod.UID), kubetypes.ResolvedPodUID(staticPod.UID))
|
||||||
|
|
||||||
t.Logf("Should be able to get the mirror pod status from status manager")
|
t.Logf("Should be able to get the mirror pod status from status manager")
|
||||||
retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID)
|
retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID)
|
||||||
@ -668,13 +669,13 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
t.Logf("Orphaned pods should be removed.")
|
t.Logf("Orphaned pods should be removed.")
|
||||||
m.apiStatusVersions[testPod.UID] = 100
|
m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100
|
||||||
m.apiStatusVersions[mirrorPod.UID] = 200
|
m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200
|
||||||
m.syncBatch()
|
m.syncBatch()
|
||||||
if _, ok := m.apiStatusVersions[testPod.UID]; ok {
|
if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; ok {
|
||||||
t.Errorf("Should have cleared status for testPod")
|
t.Errorf("Should have cleared status for testPod")
|
||||||
}
|
}
|
||||||
if _, ok := m.apiStatusVersions[mirrorPod.UID]; ok {
|
if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)]; ok {
|
||||||
t.Errorf("Should have cleared status for mirrorPod")
|
t.Errorf("Should have cleared status for mirrorPod")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -685,13 +686,13 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
|
|||||||
staticPod.UID = "static-uid"
|
staticPod.UID = "static-uid"
|
||||||
staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"}
|
staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"}
|
||||||
m.podManager.AddPod(staticPod)
|
m.podManager.AddPod(staticPod)
|
||||||
m.apiStatusVersions[testPod.UID] = 100
|
m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100
|
||||||
m.apiStatusVersions[mirrorPod.UID] = 200
|
m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200
|
||||||
m.testSyncBatch()
|
m.testSyncBatch()
|
||||||
if _, ok := m.apiStatusVersions[testPod.UID]; !ok {
|
if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; !ok {
|
||||||
t.Errorf("Should not have cleared status for testPod")
|
t.Errorf("Should not have cleared status for testPod")
|
||||||
}
|
}
|
||||||
if _, ok := m.apiStatusVersions[mirrorPod.UID]; !ok {
|
if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)]; !ok {
|
||||||
t.Errorf("Should not have cleared status for mirrorPod")
|
t.Errorf("Should not have cleared status for mirrorPod")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -786,7 +787,7 @@ func TestDoNotDeleteMirrorPods(t *testing.T) {
|
|||||||
t.Logf("Verify setup.")
|
t.Logf("Verify setup.")
|
||||||
assert.True(t, kubepod.IsStaticPod(staticPod), "SetUp error: staticPod")
|
assert.True(t, kubepod.IsStaticPod(staticPod), "SetUp error: staticPod")
|
||||||
assert.True(t, kubepod.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod")
|
assert.True(t, kubepod.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod")
|
||||||
assert.Equal(t, m.podManager.TranslatePodUID(mirrorPod.UID), staticPod.UID)
|
assert.Equal(t, m.podManager.TranslatePodUID(mirrorPod.UID), kubetypes.ResolvedPodUID(staticPod.UID))
|
||||||
|
|
||||||
status := getRandomPodStatus()
|
status := getRandomPodStatus()
|
||||||
now := metav1.Now()
|
now := metav1.Now()
|
||||||
|
@ -19,6 +19,7 @@ go_library(
|
|||||||
"//pkg/api:go_default_library",
|
"//pkg/api:go_default_library",
|
||||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO: Reconcile custom types in kubelet/types and this subpackage
|
// TODO: Reconcile custom types in kubelet/types and this subpackage
|
||||||
@ -91,3 +92,9 @@ type Reservation struct {
|
|||||||
// Kubernetes represents resources reserved for kubernetes system components.
|
// Kubernetes represents resources reserved for kubernetes system components.
|
||||||
Kubernetes v1.ResourceList
|
Kubernetes v1.ResourceList
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A pod UID which has been translated/resolved to the representation known to kubelets.
|
||||||
|
type ResolvedPodUID types.UID
|
||||||
|
|
||||||
|
// A pod UID for a mirror pod.
|
||||||
|
type MirrorPodUID types.UID
|
||||||
|
Loading…
Reference in New Issue
Block a user