From 3beae6b70b744e8fcf1bd77133a8bdd84df9c4d2 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Thu, 5 Nov 2015 18:19:45 -0800 Subject: [PATCH 1/3] Add Status in the runtime Container type This is necessary for the generic PLEG to distinguish container events. --- pkg/kubelet/container/runtime.go | 11 +++++ pkg/kubelet/dockertools/convert.go | 16 +++++++ pkg/kubelet/dockertools/convert_test.go | 20 +++++++++ pkg/kubelet/dockertools/docker_test.go | 56 ++++++++++++++----------- pkg/kubelet/rkt/rkt.go | 9 +++- 5 files changed, 87 insertions(+), 25 deletions(-) diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 9a1590a91e0..5f58e1a4e36 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -198,6 +198,15 @@ func (c *ContainerID) UnmarshalJSON(data []byte) error { return c.ParseString(string(data)) } +type ContainerStatus string + +const ( + ContainerStatusRunning ContainerStatus = "running" + ContainerStatusExited ContainerStatus = "exited" + // This unknown encompasses all the statuses that we currently don't care. + ContainerStatusUnknown ContainerStatus = "unknown" +) + // Container provides the runtime information for a container, such as ID, hash, // status of the container. type Container struct { @@ -215,6 +224,8 @@ type Container struct { // The timestamp of the creation time of the container. // TODO(yifan): Consider to move it to api.ContainerStatus. Created int64 + // Status is the status of the container. + Status ContainerStatus } // Basic information about a container image. diff --git a/pkg/kubelet/dockertools/convert.go b/pkg/kubelet/dockertools/convert.go index 7f743f5bd35..b55366cf165 100644 --- a/pkg/kubelet/dockertools/convert.go +++ b/pkg/kubelet/dockertools/convert.go @@ -18,6 +18,7 @@ package dockertools import ( "fmt" + "strings" docker "github.com/fsouza/go-dockerclient" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -27,6 +28,19 @@ import ( // This file contains helper functions to convert docker API types to runtime // (kubecontainer) types. +func mapStatus(status string) kubecontainer.ContainerStatus { + // Parse the status string in docker.APIContainers. This could break when + // we upgrade docker. + switch { + case strings.HasPrefix(status, "Up"): + return kubecontainer.ContainerStatusRunning + case strings.HasPrefix(status, "Exited"): + return kubecontainer.ContainerStatusExited + default: + return kubecontainer.ContainerStatusUnknown + } +} + // Converts docker.APIContainers to kubecontainer.Container. func toRuntimeContainer(c *docker.APIContainers) (*kubecontainer.Container, error) { if c == nil { @@ -37,12 +51,14 @@ func toRuntimeContainer(c *docker.APIContainers) (*kubecontainer.Container, erro if err != nil { return nil, err } + return &kubecontainer.Container{ ID: kubetypes.DockerID(c.ID).ContainerID(), Name: dockerName.ContainerName, Image: c.Image, Hash: hash, Created: c.Created, + Status: mapStatus(c.Status), }, nil } diff --git a/pkg/kubelet/dockertools/convert_test.go b/pkg/kubelet/dockertools/convert_test.go index acc99f660f2..439100ae41a 100644 --- a/pkg/kubelet/dockertools/convert_test.go +++ b/pkg/kubelet/dockertools/convert_test.go @@ -24,12 +24,31 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) +func TestMapStatus(t *testing.T) { + testCases := []struct { + input string + expected kubecontainer.ContainerStatus + }{ + {input: "Up 5 hours", expected: kubecontainer.ContainerStatusRunning}, + {input: "Exited (0) 2 hours ago", expected: kubecontainer.ContainerStatusExited}, + {input: "Created", expected: kubecontainer.ContainerStatusUnknown}, + {input: "Random string", expected: kubecontainer.ContainerStatusUnknown}, + } + + for i, test := range testCases { + if actual := mapStatus(test.input); actual != test.expected { + t.Errorf("Test[%d]: expected %q, got %q", i, test.expected, actual) + } + } +} + func TestToRuntimeContainer(t *testing.T) { original := &docker.APIContainers{ ID: "ab2cdf", Image: "bar_image", Created: 12345, Names: []string{"/k8s_bar.5678_foo_ns_1234_42"}, + Status: "Up 5 hours", } expected := &kubecontainer.Container{ ID: kubecontainer.ContainerID{"docker", "ab2cdf"}, @@ -37,6 +56,7 @@ func TestToRuntimeContainer(t *testing.T) { Image: "bar_image", Hash: 0x5678, Created: 12345, + Status: kubecontainer.ContainerStatusRunning, } actual, err := toRuntimeContainer(original) diff --git a/pkg/kubelet/dockertools/docker_test.go b/pkg/kubelet/dockertools/docker_test.go index 82cf3cd3ac5..fb7ecd88ad4 100644 --- a/pkg/kubelet/dockertools/docker_test.go +++ b/pkg/kubelet/dockertools/docker_test.go @@ -580,14 +580,16 @@ func TestFindContainersByPod(t *testing.T) { Namespace: "ns", Containers: []*kubecontainer.Container{ { - ID: kubetypes.DockerID("foobar").ContainerID(), - Name: "foobar", - Hash: 0x1234, + ID: kubetypes.DockerID("foobar").ContainerID(), + Name: "foobar", + Hash: 0x1234, + Status: kubecontainer.ContainerStatusUnknown, }, { - ID: kubetypes.DockerID("baz").ContainerID(), - Name: "baz", - Hash: 0x1234, + ID: kubetypes.DockerID("baz").ContainerID(), + Name: "baz", + Hash: 0x1234, + Status: kubecontainer.ContainerStatusUnknown, }, }, }, @@ -597,9 +599,10 @@ func TestFindContainersByPod(t *testing.T) { Namespace: "ns", Containers: []*kubecontainer.Container{ { - ID: kubetypes.DockerID("barbar").ContainerID(), - Name: "barbar", - Hash: 0x1234, + ID: kubetypes.DockerID("barbar").ContainerID(), + Name: "barbar", + Hash: 0x1234, + Status: kubecontainer.ContainerStatusUnknown, }, }, }, @@ -638,19 +641,22 @@ func TestFindContainersByPod(t *testing.T) { Namespace: "ns", Containers: []*kubecontainer.Container{ { - ID: kubetypes.DockerID("foobar").ContainerID(), - Name: "foobar", - Hash: 0x1234, + ID: kubetypes.DockerID("foobar").ContainerID(), + Name: "foobar", + Hash: 0x1234, + Status: kubecontainer.ContainerStatusUnknown, }, { - ID: kubetypes.DockerID("barfoo").ContainerID(), - Name: "barfoo", - Hash: 0x1234, + ID: kubetypes.DockerID("barfoo").ContainerID(), + Name: "barfoo", + Hash: 0x1234, + Status: kubecontainer.ContainerStatusUnknown, }, { - ID: kubetypes.DockerID("baz").ContainerID(), - Name: "baz", - Hash: 0x1234, + ID: kubetypes.DockerID("baz").ContainerID(), + Name: "baz", + Hash: 0x1234, + Status: kubecontainer.ContainerStatusUnknown, }, }, }, @@ -660,9 +666,10 @@ func TestFindContainersByPod(t *testing.T) { Namespace: "ns", Containers: []*kubecontainer.Container{ { - ID: kubetypes.DockerID("barbar").ContainerID(), - Name: "barbar", - Hash: 0x1234, + ID: kubetypes.DockerID("barbar").ContainerID(), + Name: "barbar", + Hash: 0x1234, + Status: kubecontainer.ContainerStatusUnknown, }, }, }, @@ -672,9 +679,10 @@ func TestFindContainersByPod(t *testing.T) { Namespace: "ns", Containers: []*kubecontainer.Container{ { - ID: kubetypes.DockerID("bazbaz").ContainerID(), - Name: "bazbaz", - Hash: 0x1234, + ID: kubetypes.DockerID("bazbaz").ContainerID(), + Name: "bazbaz", + Hash: 0x1234, + Status: kubecontainer.ContainerStatusUnknown, }, }, }, diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 3b30e6924c0..a302e6a34e0 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -793,7 +793,14 @@ func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) { var pods []*kubecontainer.Pod for _, u := range units { if strings.HasPrefix(u.Name, kubernetesUnitPrefix) { - if !all && u.SubState != "running" { + var status kubecontainer.ContainerStatus + switch { + case u.SubState == "running": + status = kubecontainer.ContainerStatusRunning + default: + status = kubecontainer.ContainerStatusExited + } + if !all && status != kubecontainer.ContainerStatusRunning { continue } pod, _, err := r.readServiceFile(u.Name) From bc6414a8734416aa98a415a69a3b14adc10ab58e Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Fri, 7 Aug 2015 14:42:21 -0700 Subject: [PATCH 2/3] kubelet: add a generic pod lifecycle event generator This change introduces pod lifecycle event generator (PLEG), and adds a generic PLEG. The generic PLEG relies on relisting to discover container events, and is container-runtime-agnostic. Both docker and rkt are changed to use generic PLEG. --- pkg/kubelet/container/fake_runtime.go | 5 + pkg/kubelet/image_manager_test.go | 24 ++--- pkg/kubelet/kubelet.go | 42 +++++++- pkg/kubelet/kubelet_test.go | 10 +- pkg/kubelet/pleg/doc.go | 19 ++++ pkg/kubelet/pleg/generic.go | 141 ++++++++++++++++++++++++ pkg/kubelet/pleg/generic_test.go | 148 ++++++++++++++++++++++++++ pkg/kubelet/pleg/pleg.go | 50 +++++++++ pkg/kubelet/pod/manager.go | 18 ++-- 9 files changed, 431 insertions(+), 26 deletions(-) create mode 100644 pkg/kubelet/pleg/doc.go create mode 100644 pkg/kubelet/pleg/generic.go create mode 100644 pkg/kubelet/pleg/generic_test.go create mode 100644 pkg/kubelet/pleg/pleg.go diff --git a/pkg/kubelet/container/fake_runtime.go b/pkg/kubelet/container/fake_runtime.go index bb2bc72804f..de885369802 100644 --- a/pkg/kubelet/container/fake_runtime.go +++ b/pkg/kubelet/container/fake_runtime.go @@ -33,6 +33,7 @@ type FakeRuntime struct { sync.Mutex CalledFunctions []string PodList []*Pod + AllPodList []*Pod ImageList []Image PodStatus api.PodStatus StartedPods []string @@ -89,6 +90,7 @@ func (f *FakeRuntime) ClearCalls() { f.CalledFunctions = []string{} f.PodList = []*Pod{} + f.AllPodList = []*Pod{} f.PodStatus = api.PodStatus{} f.StartedPods = []string{} f.KilledPods = []string{} @@ -155,6 +157,9 @@ func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) { defer f.Unlock() f.CalledFunctions = append(f.CalledFunctions, "GetPods") + if all { + return f.AllPodList, f.Err + } return f.PodList, f.Err } diff --git a/pkg/kubelet/image_manager_test.go b/pkg/kubelet/image_manager_test.go index 906323c7508..73a19678546 100644 --- a/pkg/kubelet/image_manager_test.go +++ b/pkg/kubelet/image_manager_test.go @@ -84,7 +84,7 @@ func TestDetectImagesInitialDetect(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ makeContainer(1), @@ -114,7 +114,7 @@ func TestDetectImagesWithNewImage(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ makeContainer(1), @@ -159,7 +159,7 @@ func TestDetectImagesContainerStopped(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ makeContainer(1), @@ -175,7 +175,7 @@ func TestDetectImagesContainerStopped(t *testing.T) { require.True(t, ok) // Simulate container being stopped. - fakeRuntime.PodList = []*container.Pod{} + fakeRuntime.AllPodList = []*container.Pod{} err = manager.detectImages(time.Now()) require.NoError(t, err) assert.Equal(manager.imageRecordsLen(), 2) @@ -195,7 +195,7 @@ func TestDetectImagesWithRemovedImages(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ makeContainer(1), @@ -221,7 +221,7 @@ func TestFreeSpaceImagesInUseContainersAreIgnored(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ makeContainer(1), @@ -242,7 +242,7 @@ func TestFreeSpaceRemoveByLeastRecentlyUsed(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ makeContainer(0), @@ -253,7 +253,7 @@ func TestFreeSpaceRemoveByLeastRecentlyUsed(t *testing.T) { // Make 1 be more recently used than 0. require.NoError(t, manager.detectImages(zero)) - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ makeContainer(1), @@ -261,7 +261,7 @@ func TestFreeSpaceRemoveByLeastRecentlyUsed(t *testing.T) { }, } require.NoError(t, manager.detectImages(time.Now())) - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{}, }, @@ -281,7 +281,7 @@ func TestFreeSpaceTiesBrokenByDetectedTime(t *testing.T) { fakeRuntime.ImageList = []container.Image{ makeImage(0, 1024), } - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ makeContainer(0), @@ -296,7 +296,7 @@ func TestFreeSpaceTiesBrokenByDetectedTime(t *testing.T) { makeImage(1, 2048), } require.NoError(t, manager.detectImages(time.Now())) - fakeRuntime.PodList = []*container.Pod{} + fakeRuntime.AllPodList = []*container.Pod{} require.NoError(t, manager.detectImages(time.Now())) require.Equal(t, manager.imageRecordsLen(), 2) @@ -317,7 +317,7 @@ func TestFreeSpaceImagesAlsoDoesLookupByRepoTags(t *testing.T) { Size: 2048, }, } - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 9c4f0807ac5..7c7de112e05 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -55,6 +55,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/envvars" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/pleg" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/prober" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" @@ -111,6 +112,15 @@ const ( housekeepingPeriod = time.Second * 2 etcHostsPath = "/etc/hosts" + + // Capacity of the channel for recieving pod lifecycle events. This number + // is a bit arbitrary and may be adjusted in the future. + plegChannelCapacity = 1000 + + // Relisting is used to discover missing container events. + // Use a shorter period because generic PLEG relies on relisting for + // container events. + plegRelistPeriod = time.Second * 3 ) var ( @@ -351,6 +361,7 @@ func NewMainKubelet( serializeImagePulls, ) + klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod) case "rkt": conf := &rkt.Config{ Path: rktPath, @@ -372,6 +383,7 @@ func NewMainKubelet( } klet.containerRuntime = rktRuntime klet.imageManager = rkt.NewImageManager(rktRuntime) + klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod) // No Docker daemon to put in a container. dockerDaemonContainer = "" @@ -558,6 +570,9 @@ type Kubelet struct { // as it takes time to gather all necessary node information. nodeStatusUpdateFrequency time.Duration + // Generates pod events. + pleg pleg.PodLifecycleEventGenerator + // The name of the resource-only container to run the Kubelet in (empty for no container). // Name must be absolute. resourceContainer string @@ -871,6 +886,8 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { // Run the system oom watcher forever. kl.statusManager.Start() + // Start the pod lifecycle event generator. + kl.pleg.Start() kl.syncLoop(updates, kl) } @@ -2124,20 +2141,21 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand // sync interval is defaulted to 10s. syncTicker := time.NewTicker(time.Second) housekeepingTicker := time.NewTicker(housekeepingPeriod) + plegCh := kl.pleg.Watch() for { if rs := kl.runtimeState.errors(); len(rs) != 0 { glog.Infof("skipping pod synchronization - %v", rs) time.Sleep(5 * time.Second) continue } - if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C) { + if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) { break } } } func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler SyncHandler, - syncCh <-chan time.Time, housekeepingCh <-chan time.Time) bool { + syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { kl.syncLoopMonitor.Store(time.Now()) select { case u, open := <-updates: @@ -2146,6 +2164,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler return false } kl.addSource(u.Source) + switch u.Op { case kubetypes.ADD: glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, kubeletutil.FormatPodNames(u.Pods)) @@ -2160,6 +2179,25 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler // TODO: Do we want to support this? glog.Errorf("Kubelet does not support snapshot update") } + case e := <-plegCh: + // Filter out started events since we don't use them now. + if e.Type == pleg.ContainerStarted { + break + } + pod, ok := kl.podManager.GetPodByUID(e.ID) + if !ok { + // If the pod no longer exists, ignore the event. + glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e) + break + } + glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", kubeletutil.FormatPodName(pod), e) + // Force the container runtime cache to update. + if err := kl.runtimeCache.ForceUpdateIfOlder(time.Now()); err != nil { + glog.Errorf("SyncLoop: unable to update runtime cache") + // TODO (yujuhong): should we delay the sync until container + // runtime can be updated? + } + handler.HandlePodSyncs([]*api.Pod{pod}) case <-syncCh: podsToSync := kl.getPodsToSync() if len(podsToSync) == 0 { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 2431db12c74..637abac0ab4 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -46,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/pleg" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/prober" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" @@ -147,6 +148,8 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20) kubelet.resyncInterval = 10 * time.Second kubelet.workQueue = queue.NewBasicWorkQueue() + // Relist period does not affect the tests. + kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour) return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient} } @@ -338,15 +341,16 @@ func TestSyncLoopTimeUpdate(t *testing.T) { // Start sync ticker. syncCh := make(chan time.Time, 1) housekeepingCh := make(chan time.Time, 1) + plegCh := make(chan *pleg.PodLifecycleEvent) syncCh <- time.Now() - kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh) + kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh, plegCh) loopTime2 := kubelet.LatestLoopEntryTime() if loopTime2.IsZero() { t.Errorf("Unexpected sync loop time: 0, expected non-zero value.") } syncCh <- time.Now() - kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh) + kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh, plegCh) loopTime3 := kubelet.LatestLoopEntryTime() if !loopTime3.After(loopTime1) { t.Errorf("Sync Loop Time was not updated correctly. Second update timestamp should be greater than first update timestamp") @@ -366,7 +370,7 @@ func TestSyncLoopAbort(t *testing.T) { close(ch) // sanity check (also prevent this test from hanging in the next step) - ok := kubelet.syncLoopIteration(ch, kubelet, make(chan time.Time), make(chan time.Time)) + ok := kubelet.syncLoopIteration(ch, kubelet, make(chan time.Time), make(chan time.Time), make(chan *pleg.PodLifecycleEvent, 1)) if ok { t.Fatalf("expected syncLoopIteration to return !ok since update chan was closed") } diff --git a/pkg/kubelet/pleg/doc.go b/pkg/kubelet/pleg/doc.go new file mode 100644 index 00000000000..c8782ee8983 --- /dev/null +++ b/pkg/kubelet/pleg/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package pleg contains types and a generic implementation of the pod +// lifecycle event generator. +package pleg diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go new file mode 100644 index 00000000000..dd94b3a9988 --- /dev/null +++ b/pkg/kubelet/pleg/generic.go @@ -0,0 +1,141 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pleg + +import ( + "fmt" + "time" + + "github.com/golang/glog" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util" +) + +// GenericPLEG is an extremely simple generic PLEG that relies solely on +// periodic listing to discover container changes. It should be be used +// as temporary replacement for container runtimes do not support a proper +// event generator yet. +// +// Note that GenericPLEG assumes that a container would not be created, +// terminated, and garbage collected within one relist period. If such an +// incident happens, GenenricPLEG would miss all events regarding this +// container. In the case of relisting failure, the window may become longer. +// Note that this assumption is not unique -- many kubelet internal components +// rely on terminated containers as tombstones for bookkeeping purposes. The +// garbage collector is implemented to work with such situtations. However, to +// guarantee that kubelet can handle missing container events, it is +// recommended to set the relist period short and have an auxiliary, longer +// periodic sync in kubelet as the safety net. +type GenericPLEG struct { + // The period for relisting. + relistPeriod time.Duration + // The container runtime. + runtime kubecontainer.Runtime + // The channel from which the subscriber listens events. + eventChannel chan *PodLifecycleEvent + // The internal cache for container information. + containers map[string]containerInfo +} + +type containerInfo struct { + podID types.UID + status kubecontainer.ContainerStatus +} + +func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int, + relistPeriod time.Duration) PodLifecycleEventGenerator { + return &GenericPLEG{ + relistPeriod: relistPeriod, + runtime: runtime, + eventChannel: make(chan *PodLifecycleEvent, channelCapacity), + containers: make(map[string]containerInfo), + } +} + +// Returns a channel from which the subscriber can recieve PodLifecycleEvent +// events. +// TODO: support multiple subscribers. +func (g *GenericPLEG) Watch() chan *PodLifecycleEvent { + return g.eventChannel +} + +// Start spawns a goroutine to relist periodically. +func (g *GenericPLEG) Start() { + go util.Until(g.relist, g.relistPeriod, util.NeverStop) +} + +func generateEvent(podID types.UID, cid string, oldStatus, newStatus kubecontainer.ContainerStatus) *PodLifecycleEvent { + if newStatus == oldStatus { + return nil + } + switch newStatus { + case kubecontainer.ContainerStatusRunning: + return &PodLifecycleEvent{ID: podID, Type: ContainerStarted, Data: cid} + case kubecontainer.ContainerStatusExited: + return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid} + case kubecontainer.ContainerStatusUnknown: + // Don't generate any event if the status is unknown. + return nil + default: + panic(fmt.Sprintf("unrecognized container status: %v", newStatus)) + } + return nil +} + +// relist queries the container runtime for list of pods/containers, compare +// with the internal pods/containers, and generats events accordingly. +func (g *GenericPLEG) relist() { + glog.V(5).Infof("GenericPLEG: Relisting") + // Get all the pods. + pods, err := g.runtime.GetPods(true) + if err != nil { + glog.Errorf("GenericPLEG: Unable to retrieve pods: %v", err) + return + } + + events := []*PodLifecycleEvent{} + containers := make(map[string]containerInfo, len(g.containers)) + // Create a new containers map, compares container statuses, and generates + // correspoinding events. + for _, p := range pods { + for _, c := range p.Containers { + cid := c.ID.ID + // Get the of existing container info. Defaults to status unknown. + oldStatus := kubecontainer.ContainerStatusUnknown + if info, ok := g.containers[cid]; ok { + oldStatus = info.status + } + // Generate an event if required. + glog.V(7).Infof("GenericPLEG: %v/%v: %v -> %v", p.ID, cid, oldStatus, c.Status) + if e := generateEvent(p.ID, cid, oldStatus, c.Status); e != nil { + events = append(events, e) + } + // Write to the new cache. + containers[cid] = containerInfo{podID: p.ID, status: c.Status} + } + } + + // Swap the container info cache. This is purely to avoid the need of + // garbage collection. + g.containers = containers + + // Send out the events. + for i := range events { + g.eventChannel <- events[i] + } +} diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go new file mode 100644 index 00000000000..2a974aaef90 --- /dev/null +++ b/pkg/kubelet/pleg/generic_test.go @@ -0,0 +1,148 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pleg + +import ( + "reflect" + "sort" + "testing" + "time" + + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/util" +) + +const ( + testContainerRuntimeType = "fooRuntime" +) + +type TestGenericPLEG struct { + pleg *GenericPLEG + runtime *kubecontainer.FakeRuntime +} + +func newTestGenericPLEG() *TestGenericPLEG { + fakeRuntime := &kubecontainer.FakeRuntime{} + // The channel capacity should be large enough to hold all events in a + // single test. + pleg := &GenericPLEG{ + relistPeriod: time.Hour, + runtime: fakeRuntime, + eventChannel: make(chan *PodLifecycleEvent, 100), + containers: make(map[string]containerInfo), + } + return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime} +} + +func getEventsFromChannel(ch <-chan *PodLifecycleEvent) []*PodLifecycleEvent { + events := []*PodLifecycleEvent{} + for len(ch) > 0 { + e := <-ch + events = append(events, e) + } + return events +} + +func createTestContainer(ID string, status kubecontainer.ContainerStatus) *kubecontainer.Container { + return &kubecontainer.Container{ + ID: kubecontainer.ContainerID{Type: testContainerRuntimeType, ID: ID}, + Status: status, + } +} + +type sortableEvents []*PodLifecycleEvent + +func (a sortableEvents) Len() int { return len(a) } +func (a sortableEvents) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a sortableEvents) Less(i, j int) bool { + if a[i].ID != a[j].ID { + return a[i].ID < a[j].ID + } + return a[i].Data.(string) < a[j].Data.(string) +} + +func verifyEvents(t *testing.T, expected, actual []*PodLifecycleEvent) { + sort.Sort(sortableEvents(expected)) + sort.Sort(sortableEvents(actual)) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("Actual events differ from the expected; diff: %v", util.ObjectDiff(expected, actual)) + } +} + +func TestRelisting(t *testing.T) { + testPleg := newTestGenericPLEG() + pleg, runtime := testPleg.pleg, testPleg.runtime + ch := pleg.Watch() + + // The first relist should send a PodSync event to each pod. + runtime.AllPodList = []*kubecontainer.Pod{ + { + ID: "1234", + Containers: []*kubecontainer.Container{ + createTestContainer("c1", kubecontainer.ContainerStatusExited), + createTestContainer("c2", kubecontainer.ContainerStatusRunning), + createTestContainer("c3", kubecontainer.ContainerStatusUnknown), + }, + }, + { + ID: "4567", + Containers: []*kubecontainer.Container{ + createTestContainer("c1", kubecontainer.ContainerStatusExited), + }, + }, + } + pleg.relist() + // Report every running/exited container if we see them for the first time. + expected := []*PodLifecycleEvent{ + {ID: "1234", Type: ContainerStarted, Data: "c2"}, + {ID: "4567", Type: ContainerDied, Data: "c1"}, + {ID: "1234", Type: ContainerDied, Data: "c1"}, + } + actual := getEventsFromChannel(ch) + verifyEvents(t, expected, actual) + + // The second relist should not send out any event because no container + // changed. + pleg.relist() + verifyEvents(t, expected, actual) + + runtime.AllPodList = []*kubecontainer.Pod{ + { + ID: "1234", + Containers: []*kubecontainer.Container{ + createTestContainer("c2", kubecontainer.ContainerStatusExited), + createTestContainer("c3", kubecontainer.ContainerStatusRunning), + }, + }, + { + ID: "4567", + Containers: []*kubecontainer.Container{ + createTestContainer("c4", kubecontainer.ContainerStatusRunning), + }, + }, + } + pleg.relist() + // Only report containers that transitioned to running or exited status. + expected = []*PodLifecycleEvent{ + {ID: "1234", Type: ContainerDied, Data: "c2"}, + {ID: "1234", Type: ContainerStarted, Data: "c3"}, + {ID: "4567", Type: ContainerStarted, Data: "c4"}, + } + + actual = getEventsFromChannel(ch) + verifyEvents(t, expected, actual) +} diff --git a/pkg/kubelet/pleg/pleg.go b/pkg/kubelet/pleg/pleg.go new file mode 100644 index 00000000000..57a80f4481a --- /dev/null +++ b/pkg/kubelet/pleg/pleg.go @@ -0,0 +1,50 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pleg + +import ( + "k8s.io/kubernetes/pkg/types" +) + +type PodLifeCycleEventType string + +const ( + ContainerStarted PodLifeCycleEventType = "ContainerStarted" + ContainerDied PodLifeCycleEventType = "ContainerDied" + NetworkSetupCompleted PodLifeCycleEventType = "NetworkSetupCompleted" + NetworkFailed PodLifeCycleEventType = "NetworkFailed" + // PodSync is used to trigger syncing of a pod when the observed change of + // the state of the pod cannot be captured by any single event above. + PodSync PodLifeCycleEventType = "PodSync" +) + +// PodLifecycleEvent is an event that reflects the change of the pod state. +type PodLifecycleEvent struct { + // The pod ID. + ID types.UID + // The type of the event. + Type PodLifeCycleEventType + // The accompanied data which varies based on the event type. + // - ContainerStarted/ContainerStopped: the container name (string). + // - All other event types: unused. + Data interface{} +} + +type PodLifecycleEventGenerator interface { + Start() + Watch() chan *PodLifecycleEvent +} diff --git a/pkg/kubelet/pod/manager.go b/pkg/kubelet/pod/manager.go index 292ec99afcb..c57c13f4638 100644 --- a/pkg/kubelet/pod/manager.go +++ b/pkg/kubelet/pod/manager.go @@ -171,15 +171,8 @@ func (pm *basicManager) getAllPods() []*api.Pod { return append(podsMapToPods(pm.podByUID), podsMapToPods(pm.mirrorPodByUID)...) } -// GetPodByName provides the (non-mirror) pod that matches namespace and name, -// as well as whether the pod was found. -func (pm *basicManager) GetPodByName(namespace, name string) (*api.Pod, bool) { - podFullName := kubecontainer.BuildPodFullName(name, namespace) - return pm.GetPodByFullName(podFullName) -} - -// GetPodByUID provides the (non-mirror) pod that matches pod UID as well as -// whether the pod was found. +// GetPodByUID provides the (non-mirror) pod that matches pod UID, as well as +// whether the pod is found. func (pm *basicManager) GetPodByUID(uid types.UID) (*api.Pod, bool) { pm.lock.RLock() defer pm.lock.RUnlock() @@ -187,6 +180,13 @@ func (pm *basicManager) GetPodByUID(uid types.UID) (*api.Pod, bool) { return pod, ok } +// GetPodByName provides the (non-mirror) pod that matches namespace and name, +// as well as whether the pod was found. +func (pm *basicManager) GetPodByName(namespace, name string) (*api.Pod, bool) { + podFullName := kubecontainer.BuildPodFullName(name, namespace) + return pm.GetPodByFullName(podFullName) +} + // GetPodByName returns the (non-mirror) pod that matches full name, as well as // whether the pod was found. func (pm *basicManager) GetPodByFullName(podFullName string) (*api.Pod, bool) { From ac778e8203e1f6ea43555152c48f130063cc20c2 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Mon, 9 Nov 2015 10:01:53 -0800 Subject: [PATCH 3/3] Adjust the sync/backoff period Set resyncInterval to one minute now that we rely on the generic pleg to trigger pod syncs on container events. When there is an error during syncing, pod workers need to wake up sooner to retry. Set the sync error backoff period to 10 second in this case. --- cmd/kubelet/app/server.go | 2 +- docs/admin/kubelet.md | 4 ++-- pkg/kubelet/kubelet.go | 16 +++++++++------- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 845dbb1678a..3a6e83e2e25 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -216,7 +216,7 @@ func NewKubeletServer() *KubeletServer { RootDirectory: defaultRootDir, SerializeImagePulls: true, StreamingConnectionIdleTimeout: 5 * time.Minute, - SyncFrequency: 10 * time.Second, + SyncFrequency: 1 * time.Minute, SystemContainer: "", ReconcileCIDR: true, KubeAPIQPS: 5.0, diff --git a/docs/admin/kubelet.md b/docs/admin/kubelet.md index 6f9a99bcd73..4d4f8053241 100644 --- a/docs/admin/kubelet.md +++ b/docs/admin/kubelet.md @@ -131,13 +131,13 @@ kubelet --runonce[=false]: If true, exit after spawning pods from local manifests or remote urls. Exclusive with --api-servers, and --enable-server --serialize-image-pulls[=true]: Pull images one at a time. We recommend *not* changing the default value on nodes that run docker daemon with version < 1.9 or an Aufs storage backend. Issue #10959 has more details. [default=true] --streaming-connection-idle-timeout=5m0s: Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m' - --sync-frequency=10s: Max period between synchronizing running containers and config + --sync-frequency=1m0s: Max period between synchronizing running containers and config --system-container="": Optional resource-only container in which to place all non-kernel processes that are not already in a container. Empty for no container. Rolling back the flag requires a reboot. (Default: ""). --tls-cert-file="": File containing x509 Certificate for HTTPS. (CA cert, if any, concatenated after server cert). If --tls-cert-file and --tls-private-key-file are not provided, a self-signed certificate and key are generated for the public address and saved to the directory passed to --cert-dir. --tls-private-key-file="": File containing x509 private key matching --tls-cert-file. ``` -###### Auto generated by spf13/cobra on 10-Nov-2015 +###### Auto generated by spf13/cobra on 11-Nov-2015 diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 7c7de112e05..1e84ab8d785 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -117,10 +117,14 @@ const ( // is a bit arbitrary and may be adjusted in the future. plegChannelCapacity = 1000 - // Relisting is used to discover missing container events. - // Use a shorter period because generic PLEG relies on relisting for - // container events. + // Generic PLEG relies on relisting for discovering container events. + // The period directly affects the response time of kubelet. plegRelistPeriod = time.Second * 3 + + // backOffPeriod is the period to back off when pod syncing resulting in an + // error. It is also used as the base period for the exponential backoff + // container restarts and image pulls. + backOffPeriod = time.Second * 10 ) var ( @@ -437,11 +441,9 @@ func NewMainKubelet( } klet.runtimeCache = runtimeCache klet.workQueue = queue.NewBasicWorkQueue() - // TODO(yujuhong): backoff and resync interval should be set differently - // once we switch to using pod event generator. - klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, klet.resyncInterval) + klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod) - klet.backOff = util.NewBackOff(resyncInterval, MaxContainerBackOff) + klet.backOff = util.NewBackOff(backOffPeriod, MaxContainerBackOff) klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity) klet.sourcesSeen = sets.NewString() return klet, nil