Update annotations in the kubelet.

When the pod annotations are updated in the apiserver, update the pod.
Annotations may be used to convey attributes that are required to the
pod execution, such as networking parameters.
This commit is contained in:
Pedro Roque Marques 2015-08-28 15:34:22 -07:00
parent 8d0d54ffed
commit bf170a688c
2 changed files with 96 additions and 5 deletions

View File

@ -333,6 +333,61 @@ func filterInvalidPods(pods []*api.Pod, source string, recorder record.EventReco
return
}
// Annotations that the kubelet adds to the pod.
var localAnnotations = []string{
kubelet.ConfigSourceAnnotationKey,
kubelet.ConfigMirrorAnnotationKey,
kubelet.ConfigFirstSeenAnnotationKey,
}
func isLocalAnnotationKey(key string) bool {
for _, localKey := range localAnnotations {
if key == localKey {
return true
}
}
return false
}
// isAnnotationMapEqual returns true if the existing annotation Map is equal to candidate except
// for local annotations.
func isAnnotationMapEqual(existingMap, candidateMap map[string]string) bool {
if candidateMap == nil {
return true
}
for k, v := range candidateMap {
if existingValue, ok := existingMap[k]; ok && existingValue == v {
continue
}
return false
}
for k := range existingMap {
if isLocalAnnotationKey(k) {
continue
}
// stale entry in existing map.
if _, exists := candidateMap[k]; !exists {
return false
}
}
return true
}
// updateAnnotations returns an Annotation map containing the api annotation map plus
// locally managed annotations
func updateAnnotations(existing, ref *api.Pod) {
annotations := make(map[string]string, len(ref.Annotations)+len(localAnnotations))
for k, v := range ref.Annotations {
annotations[k] = v
}
for _, k := range localAnnotations {
if v, ok := existing.Annotations[k]; ok {
annotations[k] = v
}
}
existing.Annotations = annotations
}
// checkAndUpdatePod updates existing if ref makes a meaningful change and returns true, or
// returns false if there was no update.
func checkAndUpdatePod(existing, ref *api.Pod) bool {
@ -340,13 +395,15 @@ func checkAndUpdatePod(existing, ref *api.Pod) bool {
// like the source annotation or the UID (to ensure safety)
if reflect.DeepEqual(existing.Spec, ref.Spec) &&
reflect.DeepEqual(existing.DeletionTimestamp, ref.DeletionTimestamp) &&
reflect.DeepEqual(existing.DeletionGracePeriodSeconds, ref.DeletionGracePeriodSeconds) {
reflect.DeepEqual(existing.DeletionGracePeriodSeconds, ref.DeletionGracePeriodSeconds) &&
isAnnotationMapEqual(existing.Annotations, ref.Annotations) {
return false
}
// this is an update
existing.Spec = ref.Spec
existing.DeletionTimestamp = ref.DeletionTimestamp
existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
updateAnnotations(existing, ref)
return true
}

View File

@ -22,6 +22,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/unversioned/record"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/types"
@ -55,10 +56,9 @@ func (s sortedPods) Less(i, j int) bool {
func CreateValidPod(name, namespace, source string) *api.Pod {
return &api.Pod{
ObjectMeta: api.ObjectMeta{
UID: types.UID(name), // for the purpose of testing, this is unique enough
Name: name,
Namespace: namespace,
Annotations: map[string]string{kubelet.ConfigSourceAnnotationKey: source},
UID: types.UID(name), // for the purpose of testing, this is unique enough
Name: name,
Namespace: namespace,
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
@ -95,9 +95,11 @@ func expectPodUpdate(t *testing.T, ch <-chan kubelet.PodUpdate, expected ...kube
// TODO: consider mock out recordFirstSeen in config.go
for _, pod := range update.Pods {
delete(pod.Annotations, kubelet.ConfigFirstSeenAnnotationKey)
delete(pod.Annotations, kubelet.ConfigSourceAnnotationKey)
}
for _, pod := range expected[i].Pods {
delete(pod.Annotations, kubelet.ConfigFirstSeenAnnotationKey)
delete(pod.Annotations, kubelet.ConfigSourceAnnotationKey)
}
if !api.Semantic.DeepEqual(expected[i], update) {
t.Fatalf("Expected %#v, Got %#v", expected[i], update)
@ -259,3 +261,35 @@ func TestNewPodAddedUpdatedSet(t *testing.T) {
CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo4", "new", "test")),
CreatePodUpdate(kubelet.UPDATE, NoneSource, pod))
}
func TestPodUpdateAnnotations(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
pod := CreateValidPod("foo2", "new", "test")
pod.Annotations = make(map[string]string, 0)
pod.Annotations["kubernetes.io/blah"] = "blah"
clone, err := conversion.NewCloner().DeepCopy(pod)
if err != nil {
t.Fatalf("%v", err)
}
podUpdate := CreatePodUpdate(kubelet.SET, NoneSource, CreateValidPod("foo1", "new", "test"), clone.(*api.Pod), CreateValidPod("foo3", "new", "test"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo1", "new", "test"), pod, CreateValidPod("foo3", "new", "test")))
pod.Annotations["kubenetes.io/blah"] = "superblah"
podUpdate = CreatePodUpdate(kubelet.SET, NoneSource, CreateValidPod("foo1", "new", "test"), pod, CreateValidPod("foo3", "new", "test"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod))
pod.Annotations["kubernetes.io/otherblah"] = "doh"
podUpdate = CreatePodUpdate(kubelet.SET, NoneSource, CreateValidPod("foo1", "new", "test"), pod, CreateValidPod("foo3", "new", "test"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod))
delete(pod.Annotations, "kubernetes.io/blah")
podUpdate = CreatePodUpdate(kubelet.SET, NoneSource, CreateValidPod("foo1", "new", "test"), pod, CreateValidPod("foo3", "new", "test"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod))
}