From bf170a688ca51b9bc011760ba1ca59aae6d1e838 Mon Sep 17 00:00:00 2001 From: Pedro Roque Marques Date: Fri, 28 Aug 2015 15:34:22 -0700 Subject: [PATCH] Update annotations in the kubelet. When the pod annotations are updated in the apiserver, update the pod. Annotations may be used to convey attributes that are required to the pod execution, such as networking parameters. --- pkg/kubelet/config/config.go | 59 ++++++++++++++++++++++++++++++- pkg/kubelet/config/config_test.go | 42 +++++++++++++++++++--- 2 files changed, 96 insertions(+), 5 deletions(-) diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 6b0736d3b54..0de727354bd 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -333,6 +333,61 @@ func filterInvalidPods(pods []*api.Pod, source string, recorder record.EventReco return } +// Annotations that the kubelet adds to the pod. +var localAnnotations = []string{ + kubelet.ConfigSourceAnnotationKey, + kubelet.ConfigMirrorAnnotationKey, + kubelet.ConfigFirstSeenAnnotationKey, +} + +func isLocalAnnotationKey(key string) bool { + for _, localKey := range localAnnotations { + if key == localKey { + return true + } + } + return false +} + +// isAnnotationMapEqual returns true if the existing annotation Map is equal to candidate except +// for local annotations. +func isAnnotationMapEqual(existingMap, candidateMap map[string]string) bool { + if candidateMap == nil { + return true + } + for k, v := range candidateMap { + if existingValue, ok := existingMap[k]; ok && existingValue == v { + continue + } + return false + } + for k := range existingMap { + if isLocalAnnotationKey(k) { + continue + } + // stale entry in existing map. + if _, exists := candidateMap[k]; !exists { + return false + } + } + return true +} + +// updateAnnotations returns an Annotation map containing the api annotation map plus +// locally managed annotations +func updateAnnotations(existing, ref *api.Pod) { + annotations := make(map[string]string, len(ref.Annotations)+len(localAnnotations)) + for k, v := range ref.Annotations { + annotations[k] = v + } + for _, k := range localAnnotations { + if v, ok := existing.Annotations[k]; ok { + annotations[k] = v + } + } + existing.Annotations = annotations +} + // checkAndUpdatePod updates existing if ref makes a meaningful change and returns true, or // returns false if there was no update. func checkAndUpdatePod(existing, ref *api.Pod) bool { @@ -340,13 +395,15 @@ func checkAndUpdatePod(existing, ref *api.Pod) bool { // like the source annotation or the UID (to ensure safety) if reflect.DeepEqual(existing.Spec, ref.Spec) && reflect.DeepEqual(existing.DeletionTimestamp, ref.DeletionTimestamp) && - reflect.DeepEqual(existing.DeletionGracePeriodSeconds, ref.DeletionGracePeriodSeconds) { + reflect.DeepEqual(existing.DeletionGracePeriodSeconds, ref.DeletionGracePeriodSeconds) && + isAnnotationMapEqual(existing.Annotations, ref.Annotations) { return false } // this is an update existing.Spec = ref.Spec existing.DeletionTimestamp = ref.DeletionTimestamp existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds + updateAnnotations(existing, ref) return true } diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index c213e07d618..c3d39202f8f 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -22,6 +22,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/unversioned/record" + "k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/types" @@ -55,10 +56,9 @@ func (s sortedPods) Less(i, j int) bool { func CreateValidPod(name, namespace, source string) *api.Pod { return &api.Pod{ ObjectMeta: api.ObjectMeta{ - UID: types.UID(name), // for the purpose of testing, this is unique enough - Name: name, - Namespace: namespace, - Annotations: map[string]string{kubelet.ConfigSourceAnnotationKey: source}, + UID: types.UID(name), // for the purpose of testing, this is unique enough + Name: name, + Namespace: namespace, }, Spec: api.PodSpec{ RestartPolicy: api.RestartPolicyAlways, @@ -95,9 +95,11 @@ func expectPodUpdate(t *testing.T, ch <-chan kubelet.PodUpdate, expected ...kube // TODO: consider mock out recordFirstSeen in config.go for _, pod := range update.Pods { delete(pod.Annotations, kubelet.ConfigFirstSeenAnnotationKey) + delete(pod.Annotations, kubelet.ConfigSourceAnnotationKey) } for _, pod := range expected[i].Pods { delete(pod.Annotations, kubelet.ConfigFirstSeenAnnotationKey) + delete(pod.Annotations, kubelet.ConfigSourceAnnotationKey) } if !api.Semantic.DeepEqual(expected[i], update) { t.Fatalf("Expected %#v, Got %#v", expected[i], update) @@ -259,3 +261,35 @@ func TestNewPodAddedUpdatedSet(t *testing.T) { CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo4", "new", "test")), CreatePodUpdate(kubelet.UPDATE, NoneSource, pod)) } + +func TestPodUpdateAnnotations(t *testing.T) { + channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) + + pod := CreateValidPod("foo2", "new", "test") + pod.Annotations = make(map[string]string, 0) + pod.Annotations["kubernetes.io/blah"] = "blah" + + clone, err := conversion.NewCloner().DeepCopy(pod) + if err != nil { + t.Fatalf("%v", err) + } + + podUpdate := CreatePodUpdate(kubelet.SET, NoneSource, CreateValidPod("foo1", "new", "test"), clone.(*api.Pod), CreateValidPod("foo3", "new", "test")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo1", "new", "test"), pod, CreateValidPod("foo3", "new", "test"))) + + pod.Annotations["kubenetes.io/blah"] = "superblah" + podUpdate = CreatePodUpdate(kubelet.SET, NoneSource, CreateValidPod("foo1", "new", "test"), pod, CreateValidPod("foo3", "new", "test")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod)) + + pod.Annotations["kubernetes.io/otherblah"] = "doh" + podUpdate = CreatePodUpdate(kubelet.SET, NoneSource, CreateValidPod("foo1", "new", "test"), pod, CreateValidPod("foo3", "new", "test")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod)) + + delete(pod.Annotations, "kubernetes.io/blah") + podUpdate = CreatePodUpdate(kubelet.SET, NoneSource, CreateValidPod("foo1", "new", "test"), pod, CreateValidPod("foo3", "new", "test")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod)) +}