Merge pull request #43414 from Random-Liu/use-uid-in-config

Automatic merge from submit-queue

Use uid in config.go instead of pod full name.

For https://github.com/kubernetes/kubernetes/issues/43397.

In config.go, use pod uid in pod cache.

Previously, if we update the static pod, even though a new UID is generated in file.go, config.go will only reference the pod with pod full name, and never update the pod UID in the internal cache. This causes:
1) If we change container spec, kubelet will restart the corresponding container because the container hash is changed.
2) If we change pod spec, kubelet will do nothing.

With this fix, kubelet will always restart pod whenever pod spec (including container spec) is changed.

@yujuhong @bowei @dchen1107 
/cc @kubernetes/sig-node-bugs
This commit is contained in:
Kubernetes Submit Queue 2017-03-20 18:18:36 -07:00 committed by GitHub
commit e09fda63be
2 changed files with 20 additions and 35 deletions

View File

@ -22,6 +22,7 @@ import (
"sync" "sync"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
@ -117,8 +118,8 @@ func (c *PodConfig) Sync() {
// available, then this object should be considered authoritative. // available, then this object should be considered authoritative.
type podStorage struct { type podStorage struct {
podLock sync.RWMutex podLock sync.RWMutex
// map of source name to pod name to pod reference // map of source name to pod uid to pod reference
pods map[string]map[string]*v1.Pod pods map[string]map[types.UID]*v1.Pod
mode PodConfigNotificationMode mode PodConfigNotificationMode
// ensures that updates are delivered in strict order // ensures that updates are delivered in strict order
@ -139,7 +140,7 @@ type podStorage struct {
// TODO: allow initialization of the current state of the store with snapshotted version. // TODO: allow initialization of the current state of the store with snapshotted version.
func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage { func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage {
return &podStorage{ return &podStorage{
pods: make(map[string]map[string]*v1.Pod), pods: make(map[string]map[types.UID]*v1.Pod),
mode: mode, mode: mode,
updates: updates, updates: updates,
sourcesSeen: sets.String{}, sourcesSeen: sets.String{},
@ -221,23 +222,22 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
pods := s.pods[source] pods := s.pods[source]
if pods == nil { if pods == nil {
pods = make(map[string]*v1.Pod) pods = make(map[types.UID]*v1.Pod)
} }
// updatePodFunc is the local function which updates the pod cache *oldPods* with new pods *newPods*. // updatePodFunc is the local function which updates the pod cache *oldPods* with new pods *newPods*.
// After updated, new pod will be stored in the pod cache *pods*. // After updated, new pod will be stored in the pod cache *pods*.
// Notice that *pods* and *oldPods* could be the same cache. // Notice that *pods* and *oldPods* could be the same cache.
updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[string]*v1.Pod) { updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
filtered := filterInvalidPods(newPods, source, s.recorder) filtered := filterInvalidPods(newPods, source, s.recorder)
for _, ref := range filtered { for _, ref := range filtered {
name := kubecontainer.GetPodFullName(ref)
// Annotate the pod with the source before any comparison. // Annotate the pod with the source before any comparison.
if ref.Annotations == nil { if ref.Annotations == nil {
ref.Annotations = make(map[string]string) ref.Annotations = make(map[string]string)
} }
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
if existing, found := oldPods[name]; found { if existing, found := oldPods[ref.UID]; found {
pods[name] = existing pods[ref.UID] = existing
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref) needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
if needUpdate { if needUpdate {
updatePods = append(updatePods, existing) updatePods = append(updatePods, existing)
@ -249,7 +249,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
continue continue
} }
recordFirstSeenTime(ref) recordFirstSeenTime(ref)
pods[name] = ref pods[ref.UID] = ref
addPods = append(addPods, ref) addPods = append(addPods, ref)
} }
} }
@ -280,10 +280,9 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
case kubetypes.REMOVE: case kubetypes.REMOVE:
glog.V(4).Infof("Removing pods from source %s : %v", source, update.Pods) glog.V(4).Infof("Removing pods from source %s : %v", source, update.Pods)
for _, value := range update.Pods { for _, value := range update.Pods {
name := kubecontainer.GetPodFullName(value) if existing, found := pods[value.UID]; found {
if existing, found := pods[name]; found {
// this is a delete // this is a delete
delete(pods, name) delete(pods, value.UID)
removePods = append(removePods, existing) removePods = append(removePods, existing)
continue continue
} }
@ -295,10 +294,10 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
s.markSourceSet(source) s.markSourceSet(source)
// Clear the old map entries by just creating a new map // Clear the old map entries by just creating a new map
oldPods := pods oldPods := pods
pods = make(map[string]*v1.Pod) pods = make(map[types.UID]*v1.Pod)
updatePodsFunc(update.Pods, oldPods, pods) updatePodsFunc(update.Pods, oldPods, pods)
for name, existing := range oldPods { for uid, existing := range oldPods {
if _, found := pods[name]; !found { if _, found := pods[uid]; !found {
// this is a delete // this is a delete
removePods = append(removePods, existing) removePods = append(removePods, existing)
} }
@ -339,9 +338,8 @@ func filterInvalidPods(pods []*v1.Pod, source string, recorder record.EventRecor
// TODO: remove the conversion when validation is performed on versioned objects. // TODO: remove the conversion when validation is performed on versioned objects.
internalPod := &api.Pod{} internalPod := &api.Pod{}
if err := v1.Convert_v1_Pod_To_api_Pod(pod, internalPod, nil); err != nil { if err := v1.Convert_v1_Pod_To_api_Pod(pod, internalPod, nil); err != nil {
name := kubecontainer.GetPodFullName(pod) glog.Warningf("Pod[%d] (%s) from %s failed to convert to v1, ignoring: %v", i+1, format.Pod(pod), source, err)
glog.Warningf("Pod[%d] (%s) from %s failed to convert to v1, ignoring: %v", i+1, name, source, err) recorder.Eventf(pod, v1.EventTypeWarning, "FailedConversion", "Error converting pod %s from %s, ignoring: %v", format.Pod(pod), source, err)
recorder.Eventf(pod, v1.EventTypeWarning, "FailedConversion", "Error converting pod %s from %s, ignoring: %v", name, source, err)
continue continue
} }
if errs := validation.ValidatePod(internalPod); len(errs) != 0 { if errs := validation.ValidatePod(internalPod); len(errs) != 0 {
@ -359,10 +357,9 @@ func filterInvalidPods(pods []*v1.Pod, source string, recorder record.EventRecor
} }
} }
if len(errlist) > 0 { if len(errlist) > 0 {
name := bestPodIdentString(pod)
err := errlist.ToAggregate() err := errlist.ToAggregate()
glog.Warningf("Pod[%d] (%s) from %s failed validation, ignoring: %v", i+1, name, source, err) glog.Warningf("Pod[%d] (%s) from %s failed validation, ignoring: %v", i+1, format.Pod(pod), source, err)
recorder.Eventf(pod, v1.EventTypeWarning, events.FailedValidation, "Error validating pod %s from %s, ignoring: %v", name, source, err) recorder.Eventf(pod, v1.EventTypeWarning, events.FailedValidation, "Error validating pod %s from %s, ignoring: %v", format.Pod(pod), source, err)
continue continue
} }
filtered = append(filtered, pod) filtered = append(filtered, pod)
@ -515,18 +512,6 @@ func (s *podStorage) MergedState() interface{} {
return pods return pods
} }
func bestPodIdentString(pod *v1.Pod) string {
namespace := pod.Namespace
if namespace == "" {
namespace = "<empty-namespace>"
}
name := pod.Name
if name == "" {
name = "<empty-name>"
}
return fmt.Sprintf("%s.%s", name, namespace)
}
func copyPods(sourcePods []*v1.Pod) []*v1.Pod { func copyPods(sourcePods []*v1.Pod) []*v1.Pod {
pods := []*v1.Pod{} pods := []*v1.Pod{}
for _, source := range sourcePods { for _, source := range sourcePods {

View File

@ -62,7 +62,7 @@ func (s sortedPods) Less(i, j int) bool {
func CreateValidPod(name, namespace string) *v1.Pod { func CreateValidPod(name, namespace string) *v1.Pod {
return &v1.Pod{ return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
UID: types.UID(name), // for the purpose of testing, this is unique enough UID: types.UID(name + namespace), // for the purpose of testing, this is unique enough
Name: name, Name: name,
Namespace: namespace, Namespace: namespace,
}, },
@ -248,7 +248,7 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
channel <- podUpdate channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod)) expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
podUpdate = CreatePodUpdate(kubetypes.REMOVE, TestSource, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "new"}}) podUpdate = CreatePodUpdate(kubetypes.REMOVE, TestSource, CreateValidPod("foo", "new"))
channel <- podUpdate channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.REMOVE, TestSource, pod)) expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.REMOVE, TestSource, pod))
} }