diff --git a/contrib/mesos/pkg/executor/service/kubelet.go b/contrib/mesos/pkg/executor/service/kubelet.go index f4525294c51..7db45b4db4b 100644 --- a/contrib/mesos/pkg/executor/service/kubelet.go +++ b/contrib/mesos/pkg/executor/service/kubelet.go @@ -80,5 +80,5 @@ func (kl *executorKubelet) Run(mergedUpdates <-chan kubetypes.PodUpdate) { //TODO(jdef) revisit this if/when executor failover lands // Force kubelet to delete all pods. - kl.HandlePodDeletions(kl.GetPods()) + kl.HandlePodRemoves(kl.GetPods()) } diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 0d190e01e04..265072f9c5a 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -43,10 +43,10 @@ const ( // PodConfigNotificationSnapshot delivers the full configuration as a SET whenever // any change occurs. PodConfigNotificationSnapshot - // PodConfigNotificationSnapshotAndUpdates delivers an UPDATE message whenever pods are + // PodConfigNotificationSnapshotAndUpdates delivers an UPDATE and DELETE message whenever pods are // changed, and a SET message if there are any additions or removals. PodConfigNotificationSnapshotAndUpdates - // PodConfigNotificationIncremental delivers ADD, UPDATE, REMOVE, RECONCILE to the update channel. + // PodConfigNotificationIncremental delivers ADD, UPDATE, DELETE, REMOVE, RECONCILE to the update channel. PodConfigNotificationIncremental ) @@ -152,14 +152,14 @@ func (s *podStorage) Merge(source string, change interface{}) error { defer s.updateLock.Unlock() seenBefore := s.sourcesSeen.Has(source) - adds, updates, deletes, reconciles := s.merge(source, change) + adds, updates, deletes, removes, reconciles := s.merge(source, change) firstSet := !seenBefore && s.sourcesSeen.Has(source) // deliver update notifications switch s.mode { case PodConfigNotificationIncremental: - if len(deletes.Pods) > 0 { - s.updates <- *deletes + if len(removes.Pods) > 0 { + s.updates <- *removes } if len(adds.Pods) > 0 { s.updates <- *adds @@ -167,9 +167,12 @@ func (s *podStorage) Merge(source string, change interface{}) error { if len(updates.Pods) > 0 { s.updates <- *updates } - if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 { + if len(deletes.Pods) > 0 { + s.updates <- *deletes + } + if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 { // Send an empty update when first seeing the source and there are - // no ADD or UPDATE pods from the source. This signals kubelet that + // no ADD or UPDATE or DELETE pods from the source. This signals kubelet that // the source is ready. s.updates <- *adds } @@ -179,15 +182,18 @@ func (s *podStorage) Merge(source string, change interface{}) error { } case PodConfigNotificationSnapshotAndUpdates: - if len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet { + if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet { s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source} } if len(updates.Pods) > 0 { s.updates <- *updates } + if len(deletes.Pods) > 0 { + s.updates <- *deletes + } case PodConfigNotificationSnapshot: - if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet { + if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet { s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source} } @@ -200,13 +206,14 @@ func (s *podStorage) Merge(source string, change interface{}) error { return nil } -func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, reconciles *kubetypes.PodUpdate) { +func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) { s.podLock.Lock() defer s.podLock.Unlock() addPods := []*api.Pod{} updatePods := []*api.Pod{} deletePods := []*api.Pod{} + removePods := []*api.Pod{} reconcilePods := []*api.Pod{} pods := s.pods[source] @@ -228,11 +235,13 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source if existing, found := oldPods[name]; found { pods[name] = existing - needUpdate, needReconcile := checkAndUpdatePod(existing, ref) + needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref) if needUpdate { updatePods = append(updatePods, existing) } else if needReconcile { reconcilePods = append(reconcilePods, existing) + } else if needGracefulDelete { + deletePods = append(deletePods, existing) } continue } @@ -244,9 +253,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de update := change.(kubetypes.PodUpdate) switch update.Op { - case kubetypes.ADD, kubetypes.UPDATE: + case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE: if update.Op == kubetypes.ADD { glog.V(4).Infof("Adding new pods from source %s : %v", source, update.Pods) + } else if update.Op == kubetypes.DELETE { + glog.V(4).Infof("Graceful deleting pods from source %s : %v", source, update.Pods) } else { glog.V(4).Infof("Updating pods from source %s : %v", source, update.Pods) } @@ -259,7 +270,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de if existing, found := pods[name]; found { // this is a delete delete(pods, name) - deletePods = append(deletePods, existing) + removePods = append(removePods, existing) continue } // this is a no-op @@ -275,7 +286,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de for name, existing := range oldPods { if _, found := pods[name]; !found { // this is a delete - deletePods = append(deletePods, existing) + removePods = append(removePods, existing) } } @@ -288,10 +299,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source} updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source} - deletes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(deletePods), Source: source} + deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source} + removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source} reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source} - return adds, updates, deletes, reconciles + return adds, updates, deletes, removes, reconciles } func (s *podStorage) markSourceSet(source string) { @@ -413,10 +425,13 @@ func podsDifferSemantically(existing, ref *api.Pod) bool { // checkAndUpdatePod updates existing, and: // * if ref makes a meaningful change, returns needUpdate=true +// * if ref makes a meaningful change, and this change is graceful deletion, returns needGracefulDelete=true // * if ref makes no meaningful change, but changes the pod status, returns needReconcile=true -// * else return both false -// Now, needUpdate and needReconcile should never be both true -func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool) { +// * else return all false +// Now, needUpdate, needGracefulDelete and needReconcile should never be both true +func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile, needGracefulDelete bool) { + + // 1. this is a reconcile // TODO: it would be better to update the whole object and only preserve certain things // like the source annotation or the UID (to ensure safety) if !podsDifferSemantically(existing, ref) { @@ -431,7 +446,6 @@ func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool) } return } - // this is an update // Overwrite the first-seen time with the existing one. This is our own // internal annotation, there is no need to update. @@ -443,7 +457,15 @@ func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool) existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds existing.Status = ref.Status updateAnnotations(existing, ref) - needUpdate = true + + // 2. this is an graceful delete + if ref.DeletionTimestamp != nil { + needGracefulDelete = true + } else { + // 3. this is an update + needUpdate = true + } + return } diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index 5bc6a114a5a..cc9d8af5af5 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -22,8 +22,10 @@ import ( "sort" "strconv" "testing" + "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/conversion" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -248,6 +250,25 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) { expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.REMOVE, TestSource, pod)) } +func TestNewPodAddedDelete(t *testing.T) { + channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) + + // should register an add + addedPod := CreateValidPod("foo", "new") + podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, addedPod) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, addedPod)) + + // mark this pod as deleted + timestamp := unversioned.NewTime(time.Now()) + deletedPod := CreateValidPod("foo", "new") + deletedPod.ObjectMeta.DeletionTimestamp = ×tamp + podUpdate = CreatePodUpdate(kubetypes.DELETE, TestSource, deletedPod) + channel <- podUpdate + // the existing pod should be gracefully deleted + expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.DELETE, TestSource, addedPod)) +} + func TestNewPodAddedUpdatedSet(t *testing.T) { channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 37b758fc387..93f54f103a2 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -162,7 +162,7 @@ const ( type SyncHandler interface { HandlePodAdditions(pods []*api.Pod) HandlePodUpdates(pods []*api.Pod) - HandlePodDeletions(pods []*api.Pod) + HandlePodRemoves(pods []*api.Pod) HandlePodReconcile(pods []*api.Pod) HandlePodSyncs(pods []*api.Pod) HandlePodCleanups() error @@ -1774,7 +1774,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error { // pod - the pod to sync // mirrorPod - the mirror pod for the pod to sync, if it is a static pod // podStatus - the current status (TODO: always from the status manager?) -// updateType - the type of update (ADD, UPDATE, REMOVE, RECONCILE) +// updateType - the type of update (ADD, UPDATE, REMOVE, RECONCILE, DELETE) // // The workflow is: // * If the pod is being created, record pod worker start latency @@ -2642,13 +2642,18 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle handler.HandlePodUpdates(u.Pods) case kubetypes.REMOVE: glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods)) - handler.HandlePodDeletions(u.Pods) + handler.HandlePodRemoves(u.Pods) case kubetypes.RECONCILE: glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodReconcile(u.Pods) + case kubetypes.DELETE: + glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods)) + // DELETE is treated as a UPDATE because of graceful deletion. + handler.HandlePodUpdates(u.Pods) case kubetypes.SET: // TODO: Do we want to support this? glog.Errorf("Kubelet does not support snapshot update") + } case e := <-plegCh: // PLEG event for a pod; sync it. @@ -2784,9 +2789,9 @@ func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) { } } -// HandlePodDeletions is the callback in the SyncHandler interface for pods -// being deleted from a config source. -func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) { +// HandlePodRemoves is the callback in the SyncHandler interface for pods +// being removed from a config source. +func (kl *Kubelet) HandlePodRemoves(pods []*api.Pod) { start := kl.clock.Now() for _, pod := range pods { kl.podManager.DeletePod(pod) diff --git a/pkg/kubelet/types/pod_update.go b/pkg/kubelet/types/pod_update.go index 88bcb54b32e..840a3513588 100644 --- a/pkg/kubelet/types/pod_update.go +++ b/pkg/kubelet/types/pod_update.go @@ -35,6 +35,8 @@ const ( SET PodOperation = iota // Pods with the given ids are new to this source ADD + // Pods with the given ids are gracefully deleted from this source + DELETE // Pods with the given ids have been removed from this source REMOVE // Pods with the given ids have been updated in this source