From fbc320af28a11c09d487f9b7cea43fe8f8b224c6 Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Mon, 20 Mar 2017 15:52:29 -0700 Subject: [PATCH] Use uid in config.go instead of pod full name. --- pkg/kubelet/config/config.go | 51 +++++++++++-------------------- pkg/kubelet/config/config_test.go | 4 +-- 2 files changed, 20 insertions(+), 35 deletions(-) diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 6507f860f95..f08039874ec 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -22,6 +22,7 @@ import ( "sync" "github.com/golang/glog" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/client-go/tools/record" @@ -117,8 +118,8 @@ func (c *PodConfig) Sync() { // available, then this object should be considered authoritative. type podStorage struct { podLock sync.RWMutex - // map of source name to pod name to pod reference - pods map[string]map[string]*v1.Pod + // map of source name to pod uid to pod reference + pods map[string]map[types.UID]*v1.Pod mode PodConfigNotificationMode // ensures that updates are delivered in strict order @@ -139,7 +140,7 @@ type podStorage struct { // TODO: allow initialization of the current state of the store with snapshotted version. func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage { return &podStorage{ - pods: make(map[string]map[string]*v1.Pod), + pods: make(map[string]map[types.UID]*v1.Pod), mode: mode, updates: updates, sourcesSeen: sets.String{}, @@ -221,23 +222,22 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de pods := s.pods[source] if pods == nil { - pods = make(map[string]*v1.Pod) + pods = make(map[types.UID]*v1.Pod) } // updatePodFunc is the local function which updates the pod cache *oldPods* with new pods *newPods*. // After updated, new pod will be stored in the pod cache *pods*. // Notice that *pods* and *oldPods* could be the same cache. - updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[string]*v1.Pod) { + updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) { filtered := filterInvalidPods(newPods, source, s.recorder) for _, ref := range filtered { - name := kubecontainer.GetPodFullName(ref) // Annotate the pod with the source before any comparison. if ref.Annotations == nil { ref.Annotations = make(map[string]string) } ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source - if existing, found := oldPods[name]; found { - pods[name] = existing + if existing, found := oldPods[ref.UID]; found { + pods[ref.UID] = existing needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref) if needUpdate { updatePods = append(updatePods, existing) @@ -249,7 +249,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de continue } recordFirstSeenTime(ref) - pods[name] = ref + pods[ref.UID] = ref addPods = append(addPods, ref) } } @@ -280,10 +280,9 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de case kubetypes.REMOVE: glog.V(4).Infof("Removing pods from source %s : %v", source, update.Pods) for _, value := range update.Pods { - name := kubecontainer.GetPodFullName(value) - if existing, found := pods[name]; found { + if existing, found := pods[value.UID]; found { // this is a delete - delete(pods, name) + delete(pods, value.UID) removePods = append(removePods, existing) continue } @@ -295,10 +294,10 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de s.markSourceSet(source) // Clear the old map entries by just creating a new map oldPods := pods - pods = make(map[string]*v1.Pod) + pods = make(map[types.UID]*v1.Pod) updatePodsFunc(update.Pods, oldPods, pods) - for name, existing := range oldPods { - if _, found := pods[name]; !found { + for uid, existing := range oldPods { + if _, found := pods[uid]; !found { // this is a delete removePods = append(removePods, existing) } @@ -339,9 +338,8 @@ func filterInvalidPods(pods []*v1.Pod, source string, recorder record.EventRecor // TODO: remove the conversion when validation is performed on versioned objects. internalPod := &api.Pod{} if err := v1.Convert_v1_Pod_To_api_Pod(pod, internalPod, nil); err != nil { - name := kubecontainer.GetPodFullName(pod) - glog.Warningf("Pod[%d] (%s) from %s failed to convert to v1, ignoring: %v", i+1, name, source, err) - recorder.Eventf(pod, v1.EventTypeWarning, "FailedConversion", "Error converting pod %s from %s, ignoring: %v", name, source, err) + glog.Warningf("Pod[%d] (%s) from %s failed to convert to v1, ignoring: %v", i+1, format.Pod(pod), source, err) + recorder.Eventf(pod, v1.EventTypeWarning, "FailedConversion", "Error converting pod %s from %s, ignoring: %v", format.Pod(pod), source, err) continue } if errs := validation.ValidatePod(internalPod); len(errs) != 0 { @@ -359,10 +357,9 @@ func filterInvalidPods(pods []*v1.Pod, source string, recorder record.EventRecor } } if len(errlist) > 0 { - name := bestPodIdentString(pod) err := errlist.ToAggregate() - glog.Warningf("Pod[%d] (%s) from %s failed validation, ignoring: %v", i+1, name, source, err) - recorder.Eventf(pod, v1.EventTypeWarning, events.FailedValidation, "Error validating pod %s from %s, ignoring: %v", name, source, err) + glog.Warningf("Pod[%d] (%s) from %s failed validation, ignoring: %v", i+1, format.Pod(pod), source, err) + recorder.Eventf(pod, v1.EventTypeWarning, events.FailedValidation, "Error validating pod %s from %s, ignoring: %v", format.Pod(pod), source, err) continue } filtered = append(filtered, pod) @@ -515,18 +512,6 @@ func (s *podStorage) MergedState() interface{} { return pods } -func bestPodIdentString(pod *v1.Pod) string { - namespace := pod.Namespace - if namespace == "" { - namespace = "" - } - name := pod.Name - if name == "" { - name = "" - } - return fmt.Sprintf("%s.%s", name, namespace) -} - func copyPods(sourcePods []*v1.Pod) []*v1.Pod { pods := []*v1.Pod{} for _, source := range sourcePods { diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index 50c14d7f092..76add8d34c7 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -62,7 +62,7 @@ func (s sortedPods) Less(i, j int) bool { func CreateValidPod(name, namespace string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - UID: types.UID(name), // for the purpose of testing, this is unique enough + UID: types.UID(name + namespace), // for the purpose of testing, this is unique enough Name: name, Namespace: namespace, }, @@ -248,7 +248,7 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) { channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod)) - podUpdate = CreatePodUpdate(kubetypes.REMOVE, TestSource, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "new"}}) + podUpdate = CreatePodUpdate(kubetypes.REMOVE, TestSource, CreateValidPod("foo", "new")) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.REMOVE, TestSource, pod)) }