diff --git a/pkg/kubelet/fake_pod_workers.go b/pkg/kubelet/fake_pod_workers.go index 7f036e60fd0..14fb671f215 100644 --- a/pkg/kubelet/fake_pod_workers.go +++ b/pkg/kubelet/fake_pod_workers.go @@ -30,7 +30,7 @@ type fakePodWorkers struct { t TestingInterface } -func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) { +func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType SyncPodType, updateComplete func()) { pods, err := f.runtimeCache.GetPods() if err != nil { f.t.Errorf("Unexpected error: %v", err) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c9c31e5cb80..48f5cc26104 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1304,9 +1304,6 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont // status. Any race conditions here effectively boils down to -- the pod worker didn't sync // state of a newly started container with the apiserver before the kubelet restarted, so // it's OK to pretend like the kubelet started them after it restarted. - // - // Also note that deletes currently have an updateType of `create` set in UpdatePods. - // This, again, does not matter because deletes are not processed by this method. var podStatus api.PodStatus if updateType == SyncPodCreate { @@ -1952,7 +1949,7 @@ func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType SyncPodType, mirrorPod *a return } // Run the sync in an async worker. - kl.podWorkers.UpdatePod(pod, mirrorPod, func() { + kl.podWorkers.UpdatePod(pod, mirrorPod, syncType, func() { metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start)) }) // Note the number of containers for new pods. diff --git a/pkg/kubelet/pod_manager.go b/pkg/kubelet/pod_manager.go index 1ea358accd1..5fb6e6b63ad 100644 --- a/pkg/kubelet/pod_manager.go +++ b/pkg/kubelet/pod_manager.go @@ -63,28 +63,6 @@ type podManager interface { mirrorClient } -// SyncPodType classifies pod updates, eg: create, update. -type SyncPodType int - -const ( - SyncPodSync SyncPodType = iota - SyncPodUpdate - SyncPodCreate -) - -func (sp SyncPodType) String() string { - switch sp { - case SyncPodCreate: - return "create" - case SyncPodUpdate: - return "update" - case SyncPodSync: - return "sync" - default: - return "unknown" - } -} - // All maps in basicPodManager should be set by calling UpdatePods(); // individual arrays/maps are not immutable and no other methods should attempt // to modify them. diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 85cd333e263..01a28a25e10 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -30,7 +30,7 @@ import ( // PodWorkers is an abstract interface for testability. type PodWorkers interface { - UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) + UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType SyncPodType, updateComplete func()) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) ForgetWorker(uid types.UID) } @@ -121,19 +121,11 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { } // Apply the new setting to the specified pod. updateComplete is called when the update is completed. -func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) { +func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType SyncPodType, updateComplete func()) { uid := pod.UID var podUpdates chan workUpdate var exists bool - // TODO: Pipe this through from the kubelet. Currently kubelets operating with - // snapshot updates (PodConfigNotificationSnapshot) will send updates, creates - // and deletes as SET operations, which makes updates indistinguishable from - // creates. The intent here is to communicate to the pod worker that it can take - // certain liberties, like skipping status generation, when it receives a create - // event for a pod. - updateType := SyncPodUpdate - p.podLock.Lock() defer p.podLock.Unlock() if podUpdates, exists = p.podUpdates[uid]; !exists { @@ -148,7 +140,6 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete // kubelet just restarted. In either case the kubelet is willing to believe // the status of the pod for the first pod worker sync. See corresponding // comment in syncPod. - updateType = SyncPodCreate go func() { defer util.HandleCrash() p.managePodLoop(podUpdates) diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 8ffa19588c0..8b2a3e5e091 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -31,7 +31,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util" ) func newPod(uid, name string) *api.Pod { @@ -94,7 +93,7 @@ func TestUpdatePod(t *testing.T) { numPods := 20 for i := 0; i < numPods; i++ { for j := i; j < numPods; j++ { - podWorkers.UpdatePod(newPod(string(j), string(i)), nil, func() {}) + podWorkers.UpdatePod(newPod(string(j), string(i)), nil, SyncPodCreate, func() {}) } } drainWorkers(podWorkers, numPods) @@ -122,44 +121,12 @@ func TestUpdatePod(t *testing.T) { } } -func TestUpdateType(t *testing.T) { - syncType := make(chan SyncPodType) - fakeRecorder := &record.FakeRecorder{} - podWorkers := newPodWorkers( - createFakeRuntimeCache(fakeRecorder), - func(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error { - func() { - syncType <- updateType - }() - return nil - }, - fakeRecorder, - ) - cases := map[*api.Pod][]SyncPodType{ - newPod("u1", "n1"): {SyncPodCreate, SyncPodUpdate}, - newPod("u2", "n1"): {SyncPodCreate}, - } - for p, expectedTypes := range cases { - for i := range expectedTypes { - podWorkers.UpdatePod(p, nil, func() {}) - select { - case gotType := <-syncType: - if gotType != expectedTypes[i] { - t.Fatalf("Expected sync type %v got %v for pod with uid %v", expectedTypes[i], gotType, p.UID) - } - case <-time.After(util.ForeverTestTimeout): - t.Errorf("Unexpected delay is running pod worker") - } - } - } -} - func TestForgetNonExistingPodWorkers(t *testing.T) { podWorkers, _ := createPodWorkers() numPods := 20 for i := 0; i < numPods; i++ { - podWorkers.UpdatePod(newPod(string(i), "name"), nil, func() {}) + podWorkers.UpdatePod(newPod(string(i), "name"), nil, SyncPodUpdate, func() {}) } drainWorkers(podWorkers, numPods) @@ -386,8 +353,8 @@ func TestFakePodWorkers(t *testing.T) { kubeletForRealWorkers.wg.Add(1) fakeDocker.ContainerList = tt.containerList - realPodWorkers.UpdatePod(tt.pod, tt.mirrorPod, func() {}) - fakePodWorkers.UpdatePod(tt.pod, tt.mirrorPod, func() {}) + realPodWorkers.UpdatePod(tt.pod, tt.mirrorPod, SyncPodUpdate, func() {}) + fakePodWorkers.UpdatePod(tt.pod, tt.mirrorPod, SyncPodUpdate, func() {}) kubeletForRealWorkers.wg.Wait() diff --git a/pkg/kubelet/types.go b/pkg/kubelet/types.go index 63db74864d8..7d0359cf528 100644 --- a/pkg/kubelet/types.go +++ b/pkg/kubelet/types.go @@ -88,3 +88,25 @@ func GetValidatedSources(sources []string) ([]string, error) { } return validated, nil } + +// SyncPodType classifies pod updates, eg: create, update. +type SyncPodType int + +const ( + SyncPodSync SyncPodType = iota + SyncPodUpdate + SyncPodCreate +) + +func (sp SyncPodType) String() string { + switch sp { + case SyncPodCreate: + return "create" + case SyncPodUpdate: + return "update" + case SyncPodSync: + return "sync" + default: + return "unknown" + } +}