Merge pull request #15264 from yujuhong/work_queue

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-11-03 22:51:18 -08:00
commit 859f75f436
9 changed files with 272 additions and 58 deletions

View File

@ -1827,8 +1827,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
} }
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container) // Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
err = dm.KillPod(pod, runningPod) if err := dm.KillPod(pod, runningPod); err != nil {
if err != nil {
return err return err
} }
} else { } else {
@ -1845,9 +1844,9 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
break break
} }
} }
err = dm.KillContainerInPod(container.ID, podContainer, pod) if err := dm.KillContainerInPod(container.ID, podContainer, pod); err != nil {
if err != nil {
glog.Errorf("Error killing container: %v", err) glog.Errorf("Error killing container: %v", err)
return err
} }
} }
} }
@ -1893,6 +1892,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
pod.Status.PodIP = dm.determineContainerIP(pod.Name, pod.Namespace, podInfraContainer) pod.Status.PodIP = dm.determineContainerIP(pod.Name, pod.Namespace, podInfraContainer)
} }
containersStarted := 0
// Start everything // Start everything
for idx := range containerChanges.ContainersToStart { for idx := range containerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx] container := &pod.Spec.Containers[idx]
@ -1946,11 +1946,15 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), container.Name, err) glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), container.Name, err)
continue continue
} }
containersStarted++
// Successfully started the container; clear the entry in the failure // Successfully started the container; clear the entry in the failure
// reason cache. // reason cache.
dm.clearReasonCache(pod, container) dm.clearReasonCache(pod, container)
} }
if containersStarted != len(containerChanges.ContainersToStart) {
return fmt.Errorf("not all containers have started: %d != %d", containersStarted, containerChanges.ContainersToStart)
}
return nil return nil
} }

View File

@ -538,7 +538,7 @@ func generatePodInfraContainerHash(pod *api.Pod) uint64 {
// runSyncPod is a helper function to retrieve the running pods from the fake // runSyncPod is a helper function to retrieve the running pods from the fake
// docker client and runs SyncPod for the given pod. // docker client and runs SyncPod for the given pod.
func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, pod *api.Pod, backOff *util.Backoff) { func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, pod *api.Pod, backOff *util.Backoff, expectErr bool) {
runningPods, err := dm.GetPods(false) runningPods, err := dm.GetPods(false)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
@ -554,8 +554,10 @@ func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, p
backOff = util.NewBackOff(time.Second, time.Minute) backOff = util.NewBackOff(time.Second, time.Minute)
} }
err = dm.SyncPod(pod, runningPod, *podStatus, []api.Secret{}, backOff) err = dm.SyncPod(pod, runningPod, *podStatus, []api.Secret{}, backOff)
if err != nil { if err != nil && !expectErr {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} else if err == nil && expectErr {
t.Errorf("expected error didn't occur")
} }
} }
@ -576,7 +578,7 @@ func TestSyncPodCreateNetAndContainer(t *testing.T) {
}, },
} }
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
// Create pod infra container. // Create pod infra container.
"create", "start", "inspect_container", "inspect_container", "create", "start", "inspect_container", "inspect_container",
@ -623,7 +625,7 @@ func TestSyncPodCreatesNetAndContainerPullsImage(t *testing.T) {
}, },
} }
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
// Create pod infra container. // Create pod infra container.
@ -675,7 +677,7 @@ func TestSyncPodWithPodInfraCreatesContainer(t *testing.T) {
}, },
} }
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
// Inspect pod infra container (but does not create)" // Inspect pod infra container (but does not create)"
@ -722,7 +724,7 @@ func TestSyncPodDeletesWithNoPodInfraContainer(t *testing.T) {
}, },
} }
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
// Kill the container since pod infra container is not running. // Kill the container since pod infra container is not running.
@ -795,7 +797,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
}, },
} }
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
// Check the pod infra container. // Check the pod infra container.
@ -849,7 +851,7 @@ func TestSyncPodBadHash(t *testing.T) {
}, },
} }
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
// Check the pod infra container. // Check the pod infra container.
@ -906,7 +908,7 @@ func TestSyncPodsUnhealthy(t *testing.T) {
} }
dm.livenessManager.Set(kubetypes.DockerID(unhealthyContainerID).ContainerID(), proberesults.Failure, nil) dm.livenessManager.Set(kubetypes.DockerID(unhealthyContainerID).ContainerID(), proberesults.Failure, nil)
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
// Check the pod infra container. // Check the pod infra container.
@ -963,7 +965,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
}, },
} }
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
// Check the pod infra contianer. // Check the pod infra contianer.
@ -1004,7 +1006,7 @@ func TestSyncPodWithPullPolicy(t *testing.T) {
Message: "Container image \"pull_never_image\" is not present with pull policy of Never"}}, Message: "Container image \"pull_never_image\" is not present with pull policy of Never"}},
} }
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, true)
statuses, err := dm.GetPodStatus(pod) statuses, err := dm.GetPodStatus(pod)
if err != nil { if err != nil {
t.Errorf("unable to get pod status") t.Errorf("unable to get pod status")
@ -1147,7 +1149,7 @@ func TestSyncPodWithRestartPolicy(t *testing.T) {
fakeDocker.ContainerMap = containerMap fakeDocker.ContainerMap = containerMap
pod.Spec.RestartPolicy = tt.policy pod.Spec.RestartPolicy = tt.policy
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, false)
// 'stop' is because the pod infra container is killed when no container is running. // 'stop' is because the pod infra container is killed when no container is running.
verifyCalls(t, fakeDocker, tt.calls) verifyCalls(t, fakeDocker, tt.calls)
@ -1267,7 +1269,7 @@ func TestGetPodStatusWithLastTermination(t *testing.T) {
}, },
} }
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, false)
// Check if we can retrieve the pod status. // Check if we can retrieve the pod status.
status, err := dm.GetPodStatus(pod) status, err := dm.GetPodStatus(pod)
@ -1377,16 +1379,17 @@ func TestSyncPodBackoff(t *testing.T) {
backoff int backoff int
killDelay int killDelay int
result []string result []string
expectErr bool
}{ }{
{1, 1, 1, startCalls}, {1, 1, 1, startCalls, false},
{2, 2, 2, startCalls}, {2, 2, 2, startCalls, false},
{3, 2, 3, backOffCalls}, {3, 2, 3, backOffCalls, true},
{4, 4, 4, startCalls}, {4, 4, 4, startCalls, false},
{5, 4, 5, backOffCalls}, {5, 4, 5, backOffCalls, true},
{6, 4, 6, backOffCalls}, {6, 4, 6, backOffCalls, true},
{7, 4, 7, backOffCalls}, {7, 4, 7, backOffCalls, true},
{8, 8, 129, startCalls}, {8, 8, 129, startCalls, false},
{130, 1, 0, startCalls}, {130, 1, 0, startCalls, false},
} }
backOff := util.NewBackOff(time.Second, time.Minute) backOff := util.NewBackOff(time.Second, time.Minute)
@ -1397,7 +1400,7 @@ func TestSyncPodBackoff(t *testing.T) {
fakeDocker.ContainerList = containerList fakeDocker.ContainerList = containerList
fakeClock.Time = startTime.Add(time.Duration(c.tick) * time.Second) fakeClock.Time = startTime.Add(time.Duration(c.tick) * time.Second)
runSyncPod(t, dm, fakeDocker, pod, backOff) runSyncPod(t, dm, fakeDocker, pod, backOff, c.expectErr)
verifyCalls(t, fakeDocker, c.result) verifyCalls(t, fakeDocker, c.result)
if backOff.Get(stableId) != time.Duration(c.backoff)*time.Second { if backOff.Get(stableId) != time.Duration(c.backoff)*time.Second {
@ -1448,7 +1451,7 @@ func TestGetPodCreationFailureReason(t *testing.T) {
}, },
} }
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, true)
// Check if we can retrieve the pod status. // Check if we can retrieve the pod status.
status, err := dm.GetPodStatus(pod) status, err := dm.GetPodStatus(pod)
if err != nil { if err != nil {
@ -1504,7 +1507,7 @@ func TestGetPodPullImageFailureReason(t *testing.T) {
}, },
} }
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, true)
// Check if we can retrieve the pod status. // Check if we can retrieve the pod status.
status, err := dm.GetPodStatus(pod) status, err := dm.GetPodStatus(pod)
if err != nil { if err != nil {
@ -1544,7 +1547,7 @@ func TestGetRestartCount(t *testing.T) {
// Helper function for verifying the restart count. // Helper function for verifying the restart count.
verifyRestartCount := func(pod *api.Pod, expectedCount int) api.PodStatus { verifyRestartCount := func(pod *api.Pod, expectedCount int) api.PodStatus {
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, false)
status, err := dm.GetPodStatus(pod) status, err := dm.GetPodStatus(pod)
if err != nil { if err != nil {
t.Fatalf("unexpected error %v", err) t.Fatalf("unexpected error %v", err)
@ -1621,7 +1624,7 @@ func TestGetTerminationMessagePath(t *testing.T) {
fakeDocker.ContainerMap = map[string]*docker.Container{} fakeDocker.ContainerMap = map[string]*docker.Container{}
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, false)
containerList := fakeDocker.ContainerList containerList := fakeDocker.ContainerList
if len(containerList) != 2 { if len(containerList) != 2 {
@ -1680,7 +1683,7 @@ func TestSyncPodWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
}, },
} }
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
// Check the pod infra container. // Check the pod infra container.
@ -1743,7 +1746,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
}, },
} }
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, true)
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
// Check the pod infra container. // Check the pod infra container.
@ -1817,7 +1820,7 @@ func TestSyncPodWithTerminationLog(t *testing.T) {
}, },
} }
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
// Create pod infra container. // Create pod infra container.
"create", "start", "inspect_container", "inspect_container", "create", "start", "inspect_container", "inspect_container",
@ -1857,7 +1860,7 @@ func TestSyncPodWithHostNetwork(t *testing.T) {
}, },
} }
runSyncPod(t, dm, fakeDocker, pod, nil) runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
// Create pod infra container. // Create pod infra container.

View File

@ -61,6 +61,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util" kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/securitycontext"
@ -432,7 +433,10 @@ func NewMainKubelet(
return nil, err return nil, err
} }
klet.runtimeCache = runtimeCache klet.runtimeCache = runtimeCache
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder) klet.workQueue = queue.NewBasicWorkQueue()
// TODO(yujuhong): backoff and resync interval should be set differently
// once we switch to using pod event generator.
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, klet.resyncInterval)
metrics.Register(runtimeCache) metrics.Register(runtimeCache)
@ -468,13 +472,14 @@ type nodeLister interface {
// Kubelet is the main kubelet implementation. // Kubelet is the main kubelet implementation.
type Kubelet struct { type Kubelet struct {
hostname string hostname string
nodeName string nodeName string
dockerClient dockertools.DockerInterface dockerClient dockertools.DockerInterface
runtimeCache kubecontainer.RuntimeCache runtimeCache kubecontainer.RuntimeCache
kubeClient client.Interface kubeClient client.Interface
rootDirectory string rootDirectory string
podWorkers PodWorkers podWorkers PodWorkers
resyncInterval time.Duration resyncInterval time.Duration
resyncTicker *time.Ticker resyncTicker *time.Ticker
sourcesReady SourcesReadyFn sourcesReady SourcesReadyFn
@ -642,6 +647,9 @@ type Kubelet struct {
// Information about the ports which are opened by daemons on Node running this Kubelet server. // Information about the ports which are opened by daemons on Node running this Kubelet server.
daemonEndpoints *api.NodeDaemonEndpoints daemonEndpoints *api.NodeDaemonEndpoints
// A queue used to trigger pod workers.
workQueue queue.WorkQueue
} }
func (kl *Kubelet) allSourcesReady() bool { func (kl *Kubelet) allSourcesReady() bool {
@ -1417,7 +1425,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
return nil return nil
} }
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error { func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) (syncErr error) {
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID uid := pod.UID
start := time.Now() start := time.Now()
@ -1438,6 +1446,8 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
status, err := kl.generatePodStatus(pod) status, err := kl.generatePodStatus(pod)
if err != nil { if err != nil {
glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err) glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
// Propagate the error upstream.
syncErr = err
} else { } else {
podToUpdate := pod podToUpdate := pod
if mirrorPod != nil { if mirrorPod != nil {
@ -2073,7 +2083,10 @@ func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, str
// state every sync-frequency seconds. Never returns. // state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) { func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
glog.Info("Starting kubelet main sync loop.") glog.Info("Starting kubelet main sync loop.")
kl.resyncTicker = time.NewTicker(kl.resyncInterval) // The resyncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
kl.resyncTicker = time.NewTicker(time.Second)
var housekeepingTimestamp time.Time var housekeepingTimestamp time.Time
for { for {
if !kl.containerRuntimeUp() { if !kl.containerRuntimeUp() {
@ -2138,9 +2151,15 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
glog.Errorf("Kubelet does not support snapshot update") glog.Errorf("Kubelet does not support snapshot update")
} }
case <-kl.resyncTicker.C: case <-kl.resyncTicker.C:
// Periodically syncs all the pods and performs cleanup tasks. podUIDs := kl.workQueue.GetWork()
glog.V(4).Infof("SyncLoop (periodic sync)") var podsToSync []*api.Pod
handler.HandlePodSyncs(kl.podManager.GetPods()) for _, uid := range podUIDs {
if pod, ok := kl.podManager.GetPodByUID(uid); ok {
podsToSync = append(podsToSync, pod)
}
}
glog.V(2).Infof("SyncLoop (SYNC): %d pods", len(podsToSync))
kl.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates(): case update := <-kl.livenessManager.Updates():
// We only care about failures (signalling container death) here. // We only care about failures (signalling container death) here.
if update.Result == proberesults.Failure { if update.Result == proberesults.Failure {

View File

@ -50,6 +50,7 @@ import (
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
@ -145,6 +146,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.backOff.Clock = fakeClock kubelet.backOff.Clock = fakeClock
kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20) kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20)
kubelet.resyncInterval = 10 * time.Second kubelet.resyncInterval = 10 * time.Second
kubelet.workQueue = queue.NewBasicWorkQueue()
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient} return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient}
} }

View File

@ -43,6 +43,7 @@ type Manager interface {
GetPods() []*api.Pod GetPods() []*api.Pod
GetPodByFullName(podFullName string) (*api.Pod, bool) GetPodByFullName(podFullName string) (*api.Pod, bool)
GetPodByName(namespace, name string) (*api.Pod, bool) GetPodByName(namespace, name string) (*api.Pod, bool)
GetPodByUID(types.UID) (*api.Pod, bool)
GetPodByMirrorPod(*api.Pod) (*api.Pod, bool) GetPodByMirrorPod(*api.Pod) (*api.Pod, bool)
GetMirrorPodByPod(*api.Pod) (*api.Pod, bool) GetMirrorPodByPod(*api.Pod) (*api.Pod, bool)
GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod) GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod)
@ -177,6 +178,15 @@ func (pm *basicManager) GetPodByName(namespace, name string) (*api.Pod, bool) {
return pm.GetPodByFullName(podFullName) return pm.GetPodByFullName(podFullName)
} }
// GetPodByUID provides the (non-mirror) pod that matches pod UID as well as
// whether the pod was found.
func (pm *basicManager) GetPodByUID(uid types.UID) (*api.Pod, bool) {
pm.lock.RLock()
defer pm.lock.RUnlock()
pod, ok := pm.podByUID[uid]
return pod, ok
}
// GetPodByName returns the (non-mirror) pod that matches full name, as well as // GetPodByName returns the (non-mirror) pod that matches full name, as well as
// whether the pod was found. // whether the pod was found.
func (pm *basicManager) GetPodByFullName(podFullName string) (*api.Pod, bool) { func (pm *basicManager) GetPodByFullName(podFullName string) (*api.Pod, bool) {

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
) )
@ -55,6 +56,8 @@ type podWorkers struct {
// runtimeCache is used for listing running containers. // runtimeCache is used for listing running containers.
runtimeCache kubecontainer.RuntimeCache runtimeCache kubecontainer.RuntimeCache
workQueue queue.WorkQueue
// This function is run to sync the desired stated of pod. // This function is run to sync the desired stated of pod.
// NOTE: This function has to be thread-safe - it can be called for // NOTE: This function has to be thread-safe - it can be called for
// different pods at the same time. // different pods at the same time.
@ -62,6 +65,12 @@ type podWorkers struct {
// The EventRecorder to use // The EventRecorder to use
recorder record.EventRecorder recorder record.EventRecorder
// backOffPeriod is the duration to back off when there is a sync error.
backOffPeriod time.Duration
// resyncInterval is the duration to wait until the next sync.
resyncInterval time.Duration
} }
type workUpdate struct { type workUpdate struct {
@ -79,7 +88,7 @@ type workUpdate struct {
} }
func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType, func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType,
recorder record.EventRecorder) *podWorkers { recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration) *podWorkers {
return &podWorkers{ return &podWorkers{
podUpdates: map[types.UID]chan workUpdate{}, podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{}, isWorking: map[types.UID]bool{},
@ -87,37 +96,40 @@ func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnT
runtimeCache: runtimeCache, runtimeCache: runtimeCache,
syncPodFn: syncPodFn, syncPodFn: syncPodFn,
recorder: recorder, recorder: recorder,
workQueue: workQueue,
resyncInterval: resyncInterval,
backOffPeriod: backOffPeriod,
} }
} }
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
var minRuntimeCacheTime time.Time var minRuntimeCacheTime time.Time
for newWork := range podUpdates { for newWork := range podUpdates {
func() { err := func() (err error) {
defer p.checkForUpdates(newWork.pod.UID, newWork.updateCompleteFn)
// We would like to have the state of the containers from at least // We would like to have the state of the containers from at least
// the moment when we finished the previous processing of that pod. // the moment when we finished the previous processing of that pod.
if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil { if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil {
glog.Errorf("Error updating the container runtime cache: %v", err) glog.Errorf("Error updating the container runtime cache: %v", err)
return return err
} }
pods, err := p.runtimeCache.GetPods() pods, err := p.runtimeCache.GetPods()
if err != nil { if err != nil {
glog.Errorf("Error getting pods while syncing pod: %v", err) glog.Errorf("Error getting pods while syncing pod: %v", err)
return return err
} }
err = p.syncPodFn(newWork.pod, newWork.mirrorPod, err = p.syncPodFn(newWork.pod, newWork.mirrorPod,
kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID), newWork.updateType) kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID), newWork.updateType)
minRuntimeCacheTime = time.Now()
if err != nil { if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err) glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
p.recorder.Eventf(newWork.pod, "FailedSync", "Error syncing pod, skipping: %v", err) p.recorder.Eventf(newWork.pod, "FailedSync", "Error syncing pod, skipping: %v", err)
return return err
} }
minRuntimeCacheTime = time.Now()
newWork.updateCompleteFn() newWork.updateCompleteFn()
return nil
}() }()
p.wrapUp(newWork.pod.UID, err)
} }
} }
@ -192,7 +204,17 @@ func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty
} }
} }
func (p *podWorkers) checkForUpdates(uid types.UID, updateComplete func()) { func (p *podWorkers) wrapUp(uid types.UID, syncErr error) {
// Requeue the last update if the last sync returned error.
if syncErr != nil {
p.workQueue.Enqueue(uid, p.backOffPeriod)
} else {
p.workQueue.Enqueue(uid, p.resyncInterval)
}
p.checkForUpdates(uid)
}
func (p *podWorkers) checkForUpdates(uid types.UID) {
p.podLock.Lock() p.podLock.Lock()
defer p.podLock.Unlock() defer p.podLock.Unlock()
if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists { if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists {

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
) )
@ -66,6 +67,9 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
return nil return nil
}, },
fakeRecorder, fakeRecorder,
queue.NewBasicWorkQueue(),
time.Second,
time.Second,
) )
return podWorkers, processed return podWorkers, processed
} }
@ -200,7 +204,7 @@ func TestFakePodWorkers(t *testing.T) {
kubeletForRealWorkers := &simpleFakeKubelet{} kubeletForRealWorkers := &simpleFakeKubelet{}
kubeletForFakeWorkers := &simpleFakeKubelet{} kubeletForFakeWorkers := &simpleFakeKubelet{}
realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder) realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second)
fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeRuntimeCache, t} fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeRuntimeCache, t}
tests := []struct { tests := []struct {

View File

@ -0,0 +1,73 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"sync"
"time"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
)
// WorkQueue allows queueing items with a timestamp. An item is
// considered ready to process if the timestamp has expired.
type WorkQueue interface {
// GetWork dequeues and returns all ready items.
GetWork() []types.UID
// Enqueue inserts a new item or overwrites an existing item with the
// new timestamp (time.Now() + delay) if it is greater.
Enqueue(item types.UID, delay time.Duration)
}
type basicWorkQueue struct {
clock util.Clock
lock sync.Mutex
queue map[types.UID]time.Time
}
var _ WorkQueue = &basicWorkQueue{}
func NewBasicWorkQueue() WorkQueue {
queue := make(map[types.UID]time.Time)
return &basicWorkQueue{queue: queue, clock: util.RealClock{}}
}
func (q *basicWorkQueue) GetWork() []types.UID {
q.lock.Lock()
defer q.lock.Unlock()
now := q.clock.Now()
var items []types.UID
for k, v := range q.queue {
if v.Before(now) {
items = append(items, k)
delete(q.queue, k)
}
}
return items
}
func (q *basicWorkQueue) Enqueue(item types.UID, delay time.Duration) {
q.lock.Lock()
defer q.lock.Unlock()
now := q.clock.Now()
timestamp := now.Add(delay)
existing, ok := q.queue[item]
if !ok || (ok && existing.Before(timestamp)) {
q.queue[item] = timestamp
}
}

View File

@ -0,0 +1,77 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"testing"
"time"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
)
func newTestBasicWorkQueue() (*basicWorkQueue, *util.FakeClock) {
fakeClock := &util.FakeClock{Time: time.Now()}
wq := &basicWorkQueue{
clock: fakeClock,
queue: make(map[types.UID]time.Time),
}
return wq, fakeClock
}
func compareResults(t *testing.T, expected, actual []types.UID) {
expectedSet := sets.NewString()
for _, u := range expected {
expectedSet.Insert(string(u))
}
actualSet := sets.NewString()
for _, u := range actual {
actualSet.Insert(string(u))
}
if !expectedSet.Equal(actualSet) {
t.Errorf("Expected %#v, got %#v", expectedSet.List(), actualSet.List())
}
}
func TestGetWork(t *testing.T) {
q, clock := newTestBasicWorkQueue()
q.Enqueue(types.UID("foo1"), -1*time.Minute)
q.Enqueue(types.UID("foo2"), -1*time.Minute)
q.Enqueue(types.UID("foo3"), 1*time.Minute)
q.Enqueue(types.UID("foo4"), 1*time.Minute)
expected := []types.UID{types.UID("foo1"), types.UID("foo2")}
compareResults(t, expected, q.GetWork())
compareResults(t, []types.UID{}, q.GetWork())
// Dial the time to 1 hour ahead.
clock.Step(time.Hour)
expected = []types.UID{types.UID("foo3"), types.UID("foo4")}
compareResults(t, expected, q.GetWork())
compareResults(t, []types.UID{}, q.GetWork())
}
func TestEnqueueKeepGreaterTimestamp(t *testing.T) {
q, _ := newTestBasicWorkQueue()
item := types.UID("foo")
q.Enqueue(item, -7*time.Hour)
q.Enqueue(item, 3*time.Hour)
compareResults(t, []types.UID{}, q.GetWork())
q.Enqueue(item, 3*time.Hour)
q.Enqueue(item, -7*time.Hour)
compareResults(t, []types.UID{}, q.GetWork())
}