kubelet: pipe SyncPodType to pod workers

Now that kubelet has switched to incremental updates, it has complete
information of the pod update type (create, update, sync). This change pipes
this information to pod workers so that they don't have to derive the type
again.
This commit is contained in:
Yu-Ju Hong 2015-10-01 16:25:07 -07:00
parent b9293a093b
commit 889e798ddb
6 changed files with 30 additions and 75 deletions

View File

@ -30,7 +30,7 @@ type fakePodWorkers struct {
t TestingInterface
}
func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) {
func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType SyncPodType, updateComplete func()) {
pods, err := f.runtimeCache.GetPods()
if err != nil {
f.t.Errorf("Unexpected error: %v", err)

View File

@ -1304,9 +1304,6 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
// status. Any race conditions here effectively boils down to -- the pod worker didn't sync
// state of a newly started container with the apiserver before the kubelet restarted, so
// it's OK to pretend like the kubelet started them after it restarted.
//
// Also note that deletes currently have an updateType of `create` set in UpdatePods.
// This, again, does not matter because deletes are not processed by this method.
var podStatus api.PodStatus
if updateType == SyncPodCreate {
@ -1952,7 +1949,7 @@ func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType SyncPodType, mirrorPod *a
return
}
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(pod, mirrorPod, func() {
kl.podWorkers.UpdatePod(pod, mirrorPod, syncType, func() {
metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
})
// Note the number of containers for new pods.

View File

@ -63,28 +63,6 @@ type podManager interface {
mirrorClient
}
// SyncPodType classifies pod updates, eg: create, update.
type SyncPodType int
const (
SyncPodSync SyncPodType = iota
SyncPodUpdate
SyncPodCreate
)
func (sp SyncPodType) String() string {
switch sp {
case SyncPodCreate:
return "create"
case SyncPodUpdate:
return "update"
case SyncPodSync:
return "sync"
default:
return "unknown"
}
}
// All maps in basicPodManager should be set by calling UpdatePods();
// individual arrays/maps are not immutable and no other methods should attempt
// to modify them.

View File

@ -30,7 +30,7 @@ import (
// PodWorkers is an abstract interface for testability.
type PodWorkers interface {
UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func())
UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType SyncPodType, updateComplete func())
ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
ForgetWorker(uid types.UID)
}
@ -121,19 +121,11 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
}
// Apply the new setting to the specified pod. updateComplete is called when the update is completed.
func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) {
func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType SyncPodType, updateComplete func()) {
uid := pod.UID
var podUpdates chan workUpdate
var exists bool
// TODO: Pipe this through from the kubelet. Currently kubelets operating with
// snapshot updates (PodConfigNotificationSnapshot) will send updates, creates
// and deletes as SET operations, which makes updates indistinguishable from
// creates. The intent here is to communicate to the pod worker that it can take
// certain liberties, like skipping status generation, when it receives a create
// event for a pod.
updateType := SyncPodUpdate
p.podLock.Lock()
defer p.podLock.Unlock()
if podUpdates, exists = p.podUpdates[uid]; !exists {
@ -148,7 +140,6 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete
// kubelet just restarted. In either case the kubelet is willing to believe
// the status of the pod for the first pod worker sync. See corresponding
// comment in syncPod.
updateType = SyncPodCreate
go func() {
defer util.HandleCrash()
p.managePodLoop(podUpdates)

View File

@ -31,7 +31,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
)
func newPod(uid, name string) *api.Pod {
@ -94,7 +93,7 @@ func TestUpdatePod(t *testing.T) {
numPods := 20
for i := 0; i < numPods; i++ {
for j := i; j < numPods; j++ {
podWorkers.UpdatePod(newPod(string(j), string(i)), nil, func() {})
podWorkers.UpdatePod(newPod(string(j), string(i)), nil, SyncPodCreate, func() {})
}
}
drainWorkers(podWorkers, numPods)
@ -122,44 +121,12 @@ func TestUpdatePod(t *testing.T) {
}
}
func TestUpdateType(t *testing.T) {
syncType := make(chan SyncPodType)
fakeRecorder := &record.FakeRecorder{}
podWorkers := newPodWorkers(
createFakeRuntimeCache(fakeRecorder),
func(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error {
func() {
syncType <- updateType
}()
return nil
},
fakeRecorder,
)
cases := map[*api.Pod][]SyncPodType{
newPod("u1", "n1"): {SyncPodCreate, SyncPodUpdate},
newPod("u2", "n1"): {SyncPodCreate},
}
for p, expectedTypes := range cases {
for i := range expectedTypes {
podWorkers.UpdatePod(p, nil, func() {})
select {
case gotType := <-syncType:
if gotType != expectedTypes[i] {
t.Fatalf("Expected sync type %v got %v for pod with uid %v", expectedTypes[i], gotType, p.UID)
}
case <-time.After(util.ForeverTestTimeout):
t.Errorf("Unexpected delay is running pod worker")
}
}
}
}
func TestForgetNonExistingPodWorkers(t *testing.T) {
podWorkers, _ := createPodWorkers()
numPods := 20
for i := 0; i < numPods; i++ {
podWorkers.UpdatePod(newPod(string(i), "name"), nil, func() {})
podWorkers.UpdatePod(newPod(string(i), "name"), nil, SyncPodUpdate, func() {})
}
drainWorkers(podWorkers, numPods)
@ -386,8 +353,8 @@ func TestFakePodWorkers(t *testing.T) {
kubeletForRealWorkers.wg.Add(1)
fakeDocker.ContainerList = tt.containerList
realPodWorkers.UpdatePod(tt.pod, tt.mirrorPod, func() {})
fakePodWorkers.UpdatePod(tt.pod, tt.mirrorPod, func() {})
realPodWorkers.UpdatePod(tt.pod, tt.mirrorPod, SyncPodUpdate, func() {})
fakePodWorkers.UpdatePod(tt.pod, tt.mirrorPod, SyncPodUpdate, func() {})
kubeletForRealWorkers.wg.Wait()

View File

@ -88,3 +88,25 @@ func GetValidatedSources(sources []string) ([]string, error) {
}
return validated, nil
}
// SyncPodType classifies pod updates, eg: create, update.
type SyncPodType int
const (
SyncPodSync SyncPodType = iota
SyncPodUpdate
SyncPodCreate
)
func (sp SyncPodType) String() string {
switch sp {
case SyncPodCreate:
return "create"
case SyncPodUpdate:
return "update"
case SyncPodSync:
return "sync"
default:
return "unknown"
}
}