From 889e798ddb8739d5fe5c77b1fc1388dc6363b778 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Thu, 1 Oct 2015 16:25:07 -0700 Subject: [PATCH] kubelet: pipe SyncPodType to pod workers Now that kubelet has switched to incremental updates, it has complete information of the pod update type (create, update, sync). This change pipes this information to pod workers so that they don't have to derive the type again. --- pkg/kubelet/fake_pod_workers.go | 2 +- pkg/kubelet/kubelet.go | 5 +--- pkg/kubelet/pod_manager.go | 22 ------------------ pkg/kubelet/pod_workers.go | 13 ++--------- pkg/kubelet/pod_workers_test.go | 41 ++++----------------------------- pkg/kubelet/types.go | 22 ++++++++++++++++++ 6 files changed, 30 insertions(+), 75 deletions(-) diff --git a/pkg/kubelet/fake_pod_workers.go b/pkg/kubelet/fake_pod_workers.go index 7f036e60fd0..14fb671f215 100644 --- a/pkg/kubelet/fake_pod_workers.go +++ b/pkg/kubelet/fake_pod_workers.go @@ -30,7 +30,7 @@ type fakePodWorkers struct { t TestingInterface } -func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) { +func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType SyncPodType, updateComplete func()) { pods, err := f.runtimeCache.GetPods() if err != nil { f.t.Errorf("Unexpected error: %v", err) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c9c31e5cb80..48f5cc26104 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1304,9 +1304,6 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont // status. Any race conditions here effectively boils down to -- the pod worker didn't sync // state of a newly started container with the apiserver before the kubelet restarted, so // it's OK to pretend like the kubelet started them after it restarted. - // - // Also note that deletes currently have an updateType of `create` set in UpdatePods. - // This, again, does not matter because deletes are not processed by this method. var podStatus api.PodStatus if updateType == SyncPodCreate { @@ -1952,7 +1949,7 @@ func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType SyncPodType, mirrorPod *a return } // Run the sync in an async worker. - kl.podWorkers.UpdatePod(pod, mirrorPod, func() { + kl.podWorkers.UpdatePod(pod, mirrorPod, syncType, func() { metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start)) }) // Note the number of containers for new pods. diff --git a/pkg/kubelet/pod_manager.go b/pkg/kubelet/pod_manager.go index 1ea358accd1..5fb6e6b63ad 100644 --- a/pkg/kubelet/pod_manager.go +++ b/pkg/kubelet/pod_manager.go @@ -63,28 +63,6 @@ type podManager interface { mirrorClient } -// SyncPodType classifies pod updates, eg: create, update. -type SyncPodType int - -const ( - SyncPodSync SyncPodType = iota - SyncPodUpdate - SyncPodCreate -) - -func (sp SyncPodType) String() string { - switch sp { - case SyncPodCreate: - return "create" - case SyncPodUpdate: - return "update" - case SyncPodSync: - return "sync" - default: - return "unknown" - } -} - // All maps in basicPodManager should be set by calling UpdatePods(); // individual arrays/maps are not immutable and no other methods should attempt // to modify them. diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 85cd333e263..01a28a25e10 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -30,7 +30,7 @@ import ( // PodWorkers is an abstract interface for testability. type PodWorkers interface { - UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) + UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType SyncPodType, updateComplete func()) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) ForgetWorker(uid types.UID) } @@ -121,19 +121,11 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { } // Apply the new setting to the specified pod. updateComplete is called when the update is completed. -func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) { +func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType SyncPodType, updateComplete func()) { uid := pod.UID var podUpdates chan workUpdate var exists bool - // TODO: Pipe this through from the kubelet. Currently kubelets operating with - // snapshot updates (PodConfigNotificationSnapshot) will send updates, creates - // and deletes as SET operations, which makes updates indistinguishable from - // creates. The intent here is to communicate to the pod worker that it can take - // certain liberties, like skipping status generation, when it receives a create - // event for a pod. - updateType := SyncPodUpdate - p.podLock.Lock() defer p.podLock.Unlock() if podUpdates, exists = p.podUpdates[uid]; !exists { @@ -148,7 +140,6 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete // kubelet just restarted. In either case the kubelet is willing to believe // the status of the pod for the first pod worker sync. See corresponding // comment in syncPod. - updateType = SyncPodCreate go func() { defer util.HandleCrash() p.managePodLoop(podUpdates) diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 8ffa19588c0..8b2a3e5e091 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -31,7 +31,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util" ) func newPod(uid, name string) *api.Pod { @@ -94,7 +93,7 @@ func TestUpdatePod(t *testing.T) { numPods := 20 for i := 0; i < numPods; i++ { for j := i; j < numPods; j++ { - podWorkers.UpdatePod(newPod(string(j), string(i)), nil, func() {}) + podWorkers.UpdatePod(newPod(string(j), string(i)), nil, SyncPodCreate, func() {}) } } drainWorkers(podWorkers, numPods) @@ -122,44 +121,12 @@ func TestUpdatePod(t *testing.T) { } } -func TestUpdateType(t *testing.T) { - syncType := make(chan SyncPodType) - fakeRecorder := &record.FakeRecorder{} - podWorkers := newPodWorkers( - createFakeRuntimeCache(fakeRecorder), - func(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error { - func() { - syncType <- updateType - }() - return nil - }, - fakeRecorder, - ) - cases := map[*api.Pod][]SyncPodType{ - newPod("u1", "n1"): {SyncPodCreate, SyncPodUpdate}, - newPod("u2", "n1"): {SyncPodCreate}, - } - for p, expectedTypes := range cases { - for i := range expectedTypes { - podWorkers.UpdatePod(p, nil, func() {}) - select { - case gotType := <-syncType: - if gotType != expectedTypes[i] { - t.Fatalf("Expected sync type %v got %v for pod with uid %v", expectedTypes[i], gotType, p.UID) - } - case <-time.After(util.ForeverTestTimeout): - t.Errorf("Unexpected delay is running pod worker") - } - } - } -} - func TestForgetNonExistingPodWorkers(t *testing.T) { podWorkers, _ := createPodWorkers() numPods := 20 for i := 0; i < numPods; i++ { - podWorkers.UpdatePod(newPod(string(i), "name"), nil, func() {}) + podWorkers.UpdatePod(newPod(string(i), "name"), nil, SyncPodUpdate, func() {}) } drainWorkers(podWorkers, numPods) @@ -386,8 +353,8 @@ func TestFakePodWorkers(t *testing.T) { kubeletForRealWorkers.wg.Add(1) fakeDocker.ContainerList = tt.containerList - realPodWorkers.UpdatePod(tt.pod, tt.mirrorPod, func() {}) - fakePodWorkers.UpdatePod(tt.pod, tt.mirrorPod, func() {}) + realPodWorkers.UpdatePod(tt.pod, tt.mirrorPod, SyncPodUpdate, func() {}) + fakePodWorkers.UpdatePod(tt.pod, tt.mirrorPod, SyncPodUpdate, func() {}) kubeletForRealWorkers.wg.Wait() diff --git a/pkg/kubelet/types.go b/pkg/kubelet/types.go index 63db74864d8..7d0359cf528 100644 --- a/pkg/kubelet/types.go +++ b/pkg/kubelet/types.go @@ -88,3 +88,25 @@ func GetValidatedSources(sources []string) ([]string, error) { } return validated, nil } + +// SyncPodType classifies pod updates, eg: create, update. +type SyncPodType int + +const ( + SyncPodSync SyncPodType = iota + SyncPodUpdate + SyncPodCreate +) + +func (sp SyncPodType) String() string { + switch sp { + case SyncPodCreate: + return "create" + case SyncPodUpdate: + return "update" + case SyncPodSync: + return "sync" + default: + return "unknown" + } +}