diff --git a/pkg/kubelet/kubelet_cadvisor.go b/pkg/kubelet/kubelet_cadvisor.go index 973c848320a..2ff74f03cbc 100644 --- a/pkg/kubelet/kubelet_cadvisor.go +++ b/pkg/kubelet/kubelet_cadvisor.go @@ -26,7 +26,9 @@ import ( // GetContainerInfo returns stats (from Cadvisor) for a container. func (kl *Kubelet) GetContainerInfo(podFullName string, podUID types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) { - podUID = kl.podManager.TranslatePodUID(podUID) + // Resolve and type convert back again. + // We need the static pod UID but the kubecontainer API works with types.UID. + podUID = types.UID(kl.podManager.TranslatePodUID(podUID)) pods, err := kl.runtimeCache.GetPods() if err != nil { diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index c7588e30f91..86b004565c0 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1424,7 +1424,9 @@ func (kl *Kubelet) findContainer(podFullName string, podUID types.UID, container if err != nil { return nil, err } - podUID = kl.podManager.TranslatePodUID(podUID) + // Resolve and type convert back again. + // We need the static pod UID but the kubecontainer API works with types.UID. + podUID = types.UID(kl.podManager.TranslatePodUID(podUID)) pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID) return pod.FindContainerByName(containerName), nil } @@ -1490,7 +1492,9 @@ func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port int32, if err != nil { return err } - podUID = kl.podManager.TranslatePodUID(podUID) + // Resolve and type convert back again. + // We need the static pod UID but the kubecontainer API works with types.UID. + podUID = types.UID(kl.podManager.TranslatePodUID(podUID)) pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID) if pod.IsEmpty() { return fmt.Errorf("pod not found (%q)", podFullName) @@ -1563,7 +1567,9 @@ func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID if err != nil { return nil, err } - podUID = kl.podManager.TranslatePodUID(podUID) + // Resolve and type convert back again. + // We need the static pod UID but the kubecontainer API works with types.UID. + podUID = types.UID(kl.podManager.TranslatePodUID(podUID)) podFullName := kubecontainer.BuildPodFullName(podName, podNamespace) pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID) if pod.IsEmpty() { diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index 6f2f6686ddb..20940a7dc5b 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/configmap" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/secret" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) // Manager stores and manages access to pods, maintaining the mappings @@ -44,7 +45,7 @@ import ( type Manager interface { // GetPods returns the regular pods bound to the kubelet and their spec. GetPods() []*v1.Pod - // GetPodByName returns the (non-mirror) pod that matches full name, as well as + // GetPodByFullName returns the (non-mirror) pod that matches full name, as well as // whether the pod was found. GetPodByFullName(podFullName string) (*v1.Pod, bool) // GetPodByName provides the (non-mirror) pod that matches namespace and @@ -83,10 +84,10 @@ type Manager interface { // All public-facing functions should perform this translation for UIDs // because user may provide a mirror pod UID, which is not recognized by // internal Kubelet functions. - TranslatePodUID(uid types.UID) types.UID + TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID // GetUIDTranslations returns the mappings of static pod UIDs to mirror pod // UIDs and mirror pod UIDs to static pod UIDs. - GetUIDTranslations() (podToMirror, mirrorToPod map[types.UID]types.UID) + GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID) // IsMirrorPodOf returns true if mirrorPod is a correct representation of // pod; false otherwise. IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool @@ -94,7 +95,7 @@ type Manager interface { MirrorClient } -// basicManager is a functional Manger. +// basicManager is a functional Manager. // // All fields in basicManager are read-only and are updated calling SetPods, // AddPod, UpdatePod, or DeletePod. @@ -103,16 +104,16 @@ type basicManager struct { lock sync.RWMutex // Regular pods indexed by UID. - podByUID map[types.UID]*v1.Pod + podByUID map[kubetypes.ResolvedPodUID]*v1.Pod // Mirror pods indexed by UID. - mirrorPodByUID map[types.UID]*v1.Pod + mirrorPodByUID map[kubetypes.MirrorPodUID]*v1.Pod // Pods indexed by full name for easy access. podByFullName map[string]*v1.Pod mirrorPodByFullName map[string]*v1.Pod // Mirror pod UID to pod UID map. - translationByUID map[types.UID]types.UID + translationByUID map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID // basicManager is keeping secretManager and configMapManager up-to-date. secretManager secret.Manager @@ -137,11 +138,11 @@ func (pm *basicManager) SetPods(newPods []*v1.Pod) { pm.lock.Lock() defer pm.lock.Unlock() - pm.podByUID = make(map[types.UID]*v1.Pod) + pm.podByUID = make(map[kubetypes.ResolvedPodUID]*v1.Pod) pm.podByFullName = make(map[string]*v1.Pod) - pm.mirrorPodByUID = make(map[types.UID]*v1.Pod) + pm.mirrorPodByUID = make(map[kubetypes.MirrorPodUID]*v1.Pod) pm.mirrorPodByFullName = make(map[string]*v1.Pod) - pm.translationByUID = make(map[types.UID]types.UID) + pm.translationByUID = make(map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID) pm.updatePodsInternal(newPods...) } @@ -157,7 +158,7 @@ func (pm *basicManager) UpdatePod(pod *v1.Pod) { } // updatePodsInternal replaces the given pods in the current state of the -// manager, updating the various indices. The caller is assumed to hold the +// manager, updating the various indices. The caller is assumed to hold the // lock. func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) { for _, pod := range pods { @@ -172,17 +173,21 @@ func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) { pm.configMapManager.RegisterPod(pod) } podFullName := kubecontainer.GetPodFullName(pod) + // This logic relies on a static pod and its mirror to have the same name. + // It is safe to type convert here due to the IsMirrorPod guard. if IsMirrorPod(pod) { - pm.mirrorPodByUID[pod.UID] = pod + mirrorPodUID := kubetypes.MirrorPodUID(pod.UID) + pm.mirrorPodByUID[mirrorPodUID] = pod pm.mirrorPodByFullName[podFullName] = pod if p, ok := pm.podByFullName[podFullName]; ok { - pm.translationByUID[pod.UID] = p.UID + pm.translationByUID[mirrorPodUID] = kubetypes.ResolvedPodUID(p.UID) } } else { - pm.podByUID[pod.UID] = pod + resolvedPodUID := kubetypes.ResolvedPodUID(pod.UID) + pm.podByUID[resolvedPodUID] = pod pm.podByFullName[podFullName] = pod if mirror, ok := pm.mirrorPodByFullName[podFullName]; ok { - pm.translationByUID[mirror.UID] = pod.UID + pm.translationByUID[kubetypes.MirrorPodUID(mirror.UID)] = resolvedPodUID } } } @@ -198,12 +203,14 @@ func (pm *basicManager) DeletePod(pod *v1.Pod) { pm.configMapManager.UnregisterPod(pod) } podFullName := kubecontainer.GetPodFullName(pod) + // It is safe to type convert here due to the IsMirrorPod guard. if IsMirrorPod(pod) { - delete(pm.mirrorPodByUID, pod.UID) + mirrorPodUID := kubetypes.MirrorPodUID(pod.UID) + delete(pm.mirrorPodByUID, mirrorPodUID) delete(pm.mirrorPodByFullName, podFullName) - delete(pm.translationByUID, pod.UID) + delete(pm.translationByUID, mirrorPodUID) } else { - delete(pm.podByUID, pod.UID) + delete(pm.podByUID, kubetypes.ResolvedPodUID(pod.UID)) delete(pm.podByFullName, podFullName) } } @@ -218,14 +225,14 @@ func (pm *basicManager) GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod) { pm.lock.RLock() defer pm.lock.RUnlock() pods := podsMapToPods(pm.podByUID) - mirrorPods := podsMapToPods(pm.mirrorPodByUID) + mirrorPods := mirrorPodsMapToMirrorPods(pm.mirrorPodByUID) return pods, mirrorPods } func (pm *basicManager) GetPodByUID(uid types.UID) (*v1.Pod, bool) { pm.lock.RLock() defer pm.lock.RUnlock() - pod, ok := pm.podByUID[uid] + pod, ok := pm.podByUID[kubetypes.ResolvedPodUID(uid)] // Safe conversion, map only holds non-mirrors. return pod, ok } @@ -241,25 +248,27 @@ func (pm *basicManager) GetPodByFullName(podFullName string) (*v1.Pod, bool) { return pod, ok } -func (pm *basicManager) TranslatePodUID(uid types.UID) types.UID { +func (pm *basicManager) TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID { + // It is safe to type convert to a resolved UID because type conversion is idempotent. if uid == "" { - return uid + return kubetypes.ResolvedPodUID(uid) } pm.lock.RLock() defer pm.lock.RUnlock() - if translated, ok := pm.translationByUID[uid]; ok { + if translated, ok := pm.translationByUID[kubetypes.MirrorPodUID(uid)]; ok { return translated } - return uid + return kubetypes.ResolvedPodUID(uid) } -func (pm *basicManager) GetUIDTranslations() (podToMirror, mirrorToPod map[types.UID]types.UID) { +func (pm *basicManager) GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, + mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID) { pm.lock.RLock() defer pm.lock.RUnlock() - podToMirror = make(map[types.UID]types.UID, len(pm.translationByUID)) - mirrorToPod = make(map[types.UID]types.UID, len(pm.translationByUID)) + podToMirror = make(map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, len(pm.translationByUID)) + mirrorToPod = make(map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID, len(pm.translationByUID)) // Insert empty translation mapping for all static pods. for uid, pod := range pm.podByUID { if !IsStaticPod(pod) { @@ -309,7 +318,15 @@ func (pm *basicManager) IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool { return hash == getPodHash(pod) } -func podsMapToPods(UIDMap map[types.UID]*v1.Pod) []*v1.Pod { +func podsMapToPods(UIDMap map[kubetypes.ResolvedPodUID]*v1.Pod) []*v1.Pod { + pods := make([]*v1.Pod, 0, len(UIDMap)) + for _, pod := range UIDMap { + pods = append(pods, pod) + } + return pods +} + +func mirrorPodsMapToMirrorPods(UIDMap map[kubetypes.MirrorPodUID]*v1.Pod) []*v1.Pod { pods := make([]*v1.Pod, 0, len(UIDMap)) for _, pod := range UIDMap { pods = append(pods, pod) diff --git a/pkg/kubelet/pod/pod_manager_test.go b/pkg/kubelet/pod/pod_manager_test.go index 84baa026a81..557a5927f3e 100644 --- a/pkg/kubelet/pod/pod_manager_test.go +++ b/pkg/kubelet/pod/pod_manager_test.go @@ -97,8 +97,8 @@ func TestGetSetPods(t *testing.T) { t.Errorf("pod %q was not found in %#v", expected.UID, actualPods) } } - // Tests UID translation works as expected. - if uid := podManager.TranslatePodUID(mirrorPod.UID); uid != staticPod.UID { + // Tests UID translation works as expected. Convert static pod UID for comparison only. + if uid := podManager.TranslatePodUID(mirrorPod.UID); uid != kubetypes.ResolvedPodUID(staticPod.UID) { t.Errorf("unable to translate UID %q to the static POD's UID %q; %#v", mirrorPod.UID, staticPod.UID, podManager.mirrorPodByUID) } diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index e2cc46a64e6..e6483a0ade4 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -66,11 +66,11 @@ type manager struct { podStatusChannel chan podStatusSyncRequest // Map from (mirror) pod UID to latest status version successfully sent to the API server. // apiStatusVersions must only be accessed from the sync thread. - apiStatusVersions map[types.UID]uint64 + apiStatusVersions map[kubetypes.MirrorPodUID]uint64 podDeletionSafety PodDeletionSafetyProvider } -// PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components +// PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components // that need to introspect status. type PodStatusProvider interface { // GetPodStatus returns the cached status for the provided pod UID, as well as whether it @@ -78,7 +78,7 @@ type PodStatusProvider interface { GetPodStatus(uid types.UID) (v1.PodStatus, bool) } -// An object which provides guarantees that a pod can be saftely deleted. +// An object which provides guarantees that a pod can be safely deleted. type PodDeletionSafetyProvider interface { // A function which returns true if the pod can safely be deleted PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool @@ -116,7 +116,7 @@ func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podD podManager: podManager, podStatuses: make(map[types.UID]versionedPodStatus), podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses - apiStatusVersions: make(map[types.UID]uint64), + apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64), podDeletionSafety: podDeletionSafety, } } @@ -155,7 +155,7 @@ func (m *manager) Start() { func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) { m.podStatusesLock.RLock() defer m.podStatusesLock.RUnlock() - status, ok := m.podStatuses[m.podManager.TranslatePodUID(uid)] + status, ok := m.podStatuses[types.UID(m.podManager.TranslatePodUID(uid))] return status.status, ok } @@ -342,7 +342,7 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp default: // Let the periodic syncBatch handle the update if the channel is full. // We can't block, since we hold the mutex lock. - glog.V(4).Infof("Skpping the status update for pod %q for now because the channel is full; status: %+v", + glog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v", format.Pod(pod), status) return false } @@ -377,7 +377,7 @@ func (m *manager) syncBatch() { // Clean up orphaned versions. for uid := range m.apiStatusVersions { - _, hasPod := m.podStatuses[uid] + _, hasPod := m.podStatuses[types.UID(uid)] _, hasMirror := mirrorToPod[uid] if !hasPod && !hasMirror { delete(m.apiStatusVersions, uid) @@ -385,15 +385,15 @@ func (m *manager) syncBatch() { } for uid, status := range m.podStatuses { - syncedUID := uid - if mirrorUID, ok := podToMirror[uid]; ok { + syncedUID := kubetypes.MirrorPodUID(uid) + if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok { if mirrorUID == "" { glog.V(5).Infof("Static pod %q (%s/%s) does not have a corresponding mirror pod; skipping", uid, status.podName, status.podNamespace) continue } syncedUID = mirrorUID } - if m.needsUpdate(syncedUID, status) { + if m.needsUpdate(types.UID(syncedUID), status) { updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) } else if m.needsReconcile(uid, status.status) { // Delete the apiStatusVersions here to force an update on the pod status @@ -433,7 +433,8 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { } translatedUID := m.podManager.TranslatePodUID(pod.UID) - if len(translatedUID) > 0 && translatedUID != uid { + // Type convert original uid just for the purpose of comparison. + if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) { glog.V(2).Infof("Pod %q was deleted and then recreated, skipping status update; old UID %q, new UID %q", format.Pod(pod), uid, translatedUID) m.deletePodStatus(uid) return @@ -451,7 +452,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { pod = newPod glog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status) - m.apiStatusVersions[pod.UID] = status.version + m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version // We don't handle graceful deletion of mirror pods. if m.canBeDeleted(pod, status.status) { @@ -469,9 +470,9 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { } // needsUpdate returns whether the status is stale for the given pod UID. -// This method is not thread safe, and most only be accessed by the sync thread. +// This method is not thread safe, and must only be accessed by the sync thread. func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool { - latest, ok := m.apiStatusVersions[uid] + latest, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(uid)] if !ok || latest < status.version { return true } diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index 4e986c4f459..23148065c49 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -401,7 +401,8 @@ func TestStaleUpdates(t *testing.T) { verifyUpdates(t, m, 0) t.Log("... unless it's stale.") - m.apiStatusVersions[pod.UID] = m.apiStatusVersions[pod.UID] - 1 + mirrorPodUID := kubetypes.MirrorPodUID(pod.UID) + m.apiStatusVersions[mirrorPodUID] = m.apiStatusVersions[mirrorPodUID] - 1 m.SetPodStatus(pod, status) m.syncBatch() @@ -515,7 +516,7 @@ func TestStaticPod(t *testing.T) { t.Logf("Create the mirror pod") m.podManager.AddPod(mirrorPod) assert.True(t, kubepod.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod") - assert.Equal(t, m.podManager.TranslatePodUID(mirrorPod.UID), staticPod.UID) + assert.Equal(t, m.podManager.TranslatePodUID(mirrorPod.UID), kubetypes.ResolvedPodUID(staticPod.UID)) t.Logf("Should be able to get the mirror pod status from status manager") retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID) @@ -668,13 +669,13 @@ func TestSyncBatchCleanupVersions(t *testing.T) { } t.Logf("Orphaned pods should be removed.") - m.apiStatusVersions[testPod.UID] = 100 - m.apiStatusVersions[mirrorPod.UID] = 200 + m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100 + m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200 m.syncBatch() - if _, ok := m.apiStatusVersions[testPod.UID]; ok { + if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; ok { t.Errorf("Should have cleared status for testPod") } - if _, ok := m.apiStatusVersions[mirrorPod.UID]; ok { + if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)]; ok { t.Errorf("Should have cleared status for mirrorPod") } @@ -685,13 +686,13 @@ func TestSyncBatchCleanupVersions(t *testing.T) { staticPod.UID = "static-uid" staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"} m.podManager.AddPod(staticPod) - m.apiStatusVersions[testPod.UID] = 100 - m.apiStatusVersions[mirrorPod.UID] = 200 + m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100 + m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200 m.testSyncBatch() - if _, ok := m.apiStatusVersions[testPod.UID]; !ok { + if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; !ok { t.Errorf("Should not have cleared status for testPod") } - if _, ok := m.apiStatusVersions[mirrorPod.UID]; !ok { + if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)]; !ok { t.Errorf("Should not have cleared status for mirrorPod") } } @@ -786,7 +787,7 @@ func TestDoNotDeleteMirrorPods(t *testing.T) { t.Logf("Verify setup.") assert.True(t, kubepod.IsStaticPod(staticPod), "SetUp error: staticPod") assert.True(t, kubepod.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod") - assert.Equal(t, m.podManager.TranslatePodUID(mirrorPod.UID), staticPod.UID) + assert.Equal(t, m.podManager.TranslatePodUID(mirrorPod.UID), kubetypes.ResolvedPodUID(staticPod.UID)) status := getRandomPodStatus() now := metav1.Now() diff --git a/pkg/kubelet/types/BUILD b/pkg/kubelet/types/BUILD index c44dfba7164..296097268d7 100644 --- a/pkg/kubelet/types/BUILD +++ b/pkg/kubelet/types/BUILD @@ -19,6 +19,7 @@ go_library( "//pkg/api:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", ], ) diff --git a/pkg/kubelet/types/types.go b/pkg/kubelet/types/types.go index f356d782b5a..b0dff97a78a 100644 --- a/pkg/kubelet/types/types.go +++ b/pkg/kubelet/types/types.go @@ -21,6 +21,7 @@ import ( "time" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" ) // TODO: Reconcile custom types in kubelet/types and this subpackage @@ -91,3 +92,9 @@ type Reservation struct { // Kubernetes represents resources reserved for kubernetes system components. Kubernetes v1.ResourceList } + +// A pod UID which has been translated/resolved to the representation known to kubelets. +type ResolvedPodUID types.UID + +// A pod UID for a mirror pod. +type MirrorPodUID types.UID