Merge pull request #13571 from yujuhong/lifecycle_v0

kubelet: add a generic pod lifecycle event generator
This commit is contained in:
Jeff Lowdermilk 2015-11-16 14:59:40 -08:00
commit 70d89a3541
16 changed files with 527 additions and 58 deletions

View File

@ -216,7 +216,7 @@ func NewKubeletServer() *KubeletServer {
RootDirectory: defaultRootDir,
SerializeImagePulls: true,
StreamingConnectionIdleTimeout: 5 * time.Minute,
SyncFrequency: 10 * time.Second,
SyncFrequency: 1 * time.Minute,
SystemContainer: "",
ReconcileCIDR: true,
KubeAPIQPS: 5.0,

View File

@ -131,13 +131,13 @@ kubelet
--runonce[=false]: If true, exit after spawning pods from local manifests or remote urls. Exclusive with --api-servers, and --enable-server
--serialize-image-pulls[=true]: Pull images one at a time. We recommend *not* changing the default value on nodes that run docker daemon with version < 1.9 or an Aufs storage backend. Issue #10959 has more details. [default=true]
--streaming-connection-idle-timeout=5m0s: Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m'
--sync-frequency=10s: Max period between synchronizing running containers and config
--sync-frequency=1m0s: Max period between synchronizing running containers and config
--system-container="": Optional resource-only container in which to place all non-kernel processes that are not already in a container. Empty for no container. Rolling back the flag requires a reboot. (Default: "").
--tls-cert-file="": File containing x509 Certificate for HTTPS. (CA cert, if any, concatenated after server cert). If --tls-cert-file and --tls-private-key-file are not provided, a self-signed certificate and key are generated for the public address and saved to the directory passed to --cert-dir.
--tls-private-key-file="": File containing x509 private key matching --tls-cert-file.
```
###### Auto generated by spf13/cobra on 10-Nov-2015
###### Auto generated by spf13/cobra on 11-Nov-2015
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->

View File

@ -33,6 +33,7 @@ type FakeRuntime struct {
sync.Mutex
CalledFunctions []string
PodList []*Pod
AllPodList []*Pod
ImageList []Image
PodStatus api.PodStatus
StartedPods []string
@ -89,6 +90,7 @@ func (f *FakeRuntime) ClearCalls() {
f.CalledFunctions = []string{}
f.PodList = []*Pod{}
f.AllPodList = []*Pod{}
f.PodStatus = api.PodStatus{}
f.StartedPods = []string{}
f.KilledPods = []string{}
@ -155,6 +157,9 @@ func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) {
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "GetPods")
if all {
return f.AllPodList, f.Err
}
return f.PodList, f.Err
}

View File

@ -198,6 +198,15 @@ func (c *ContainerID) UnmarshalJSON(data []byte) error {
return c.ParseString(string(data))
}
type ContainerStatus string
const (
ContainerStatusRunning ContainerStatus = "running"
ContainerStatusExited ContainerStatus = "exited"
// This unknown encompasses all the statuses that we currently don't care.
ContainerStatusUnknown ContainerStatus = "unknown"
)
// Container provides the runtime information for a container, such as ID, hash,
// status of the container.
type Container struct {
@ -215,6 +224,8 @@ type Container struct {
// The timestamp of the creation time of the container.
// TODO(yifan): Consider to move it to api.ContainerStatus.
Created int64
// Status is the status of the container.
Status ContainerStatus
}
// Basic information about a container image.

View File

@ -18,6 +18,7 @@ package dockertools
import (
"fmt"
"strings"
docker "github.com/fsouza/go-dockerclient"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -27,6 +28,19 @@ import (
// This file contains helper functions to convert docker API types to runtime
// (kubecontainer) types.
func mapStatus(status string) kubecontainer.ContainerStatus {
// Parse the status string in docker.APIContainers. This could break when
// we upgrade docker.
switch {
case strings.HasPrefix(status, "Up"):
return kubecontainer.ContainerStatusRunning
case strings.HasPrefix(status, "Exited"):
return kubecontainer.ContainerStatusExited
default:
return kubecontainer.ContainerStatusUnknown
}
}
// Converts docker.APIContainers to kubecontainer.Container.
func toRuntimeContainer(c *docker.APIContainers) (*kubecontainer.Container, error) {
if c == nil {
@ -37,12 +51,14 @@ func toRuntimeContainer(c *docker.APIContainers) (*kubecontainer.Container, erro
if err != nil {
return nil, err
}
return &kubecontainer.Container{
ID: kubetypes.DockerID(c.ID).ContainerID(),
Name: dockerName.ContainerName,
Image: c.Image,
Hash: hash,
Created: c.Created,
Status: mapStatus(c.Status),
}, nil
}

View File

@ -24,12 +24,31 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
func TestMapStatus(t *testing.T) {
testCases := []struct {
input string
expected kubecontainer.ContainerStatus
}{
{input: "Up 5 hours", expected: kubecontainer.ContainerStatusRunning},
{input: "Exited (0) 2 hours ago", expected: kubecontainer.ContainerStatusExited},
{input: "Created", expected: kubecontainer.ContainerStatusUnknown},
{input: "Random string", expected: kubecontainer.ContainerStatusUnknown},
}
for i, test := range testCases {
if actual := mapStatus(test.input); actual != test.expected {
t.Errorf("Test[%d]: expected %q, got %q", i, test.expected, actual)
}
}
}
func TestToRuntimeContainer(t *testing.T) {
original := &docker.APIContainers{
ID: "ab2cdf",
Image: "bar_image",
Created: 12345,
Names: []string{"/k8s_bar.5678_foo_ns_1234_42"},
Status: "Up 5 hours",
}
expected := &kubecontainer.Container{
ID: kubecontainer.ContainerID{"docker", "ab2cdf"},
@ -37,6 +56,7 @@ func TestToRuntimeContainer(t *testing.T) {
Image: "bar_image",
Hash: 0x5678,
Created: 12345,
Status: kubecontainer.ContainerStatusRunning,
}
actual, err := toRuntimeContainer(original)

View File

@ -583,11 +583,13 @@ func TestFindContainersByPod(t *testing.T) {
ID: kubetypes.DockerID("foobar").ContainerID(),
Name: "foobar",
Hash: 0x1234,
Status: kubecontainer.ContainerStatusUnknown,
},
{
ID: kubetypes.DockerID("baz").ContainerID(),
Name: "baz",
Hash: 0x1234,
Status: kubecontainer.ContainerStatusUnknown,
},
},
},
@ -600,6 +602,7 @@ func TestFindContainersByPod(t *testing.T) {
ID: kubetypes.DockerID("barbar").ContainerID(),
Name: "barbar",
Hash: 0x1234,
Status: kubecontainer.ContainerStatusUnknown,
},
},
},
@ -641,16 +644,19 @@ func TestFindContainersByPod(t *testing.T) {
ID: kubetypes.DockerID("foobar").ContainerID(),
Name: "foobar",
Hash: 0x1234,
Status: kubecontainer.ContainerStatusUnknown,
},
{
ID: kubetypes.DockerID("barfoo").ContainerID(),
Name: "barfoo",
Hash: 0x1234,
Status: kubecontainer.ContainerStatusUnknown,
},
{
ID: kubetypes.DockerID("baz").ContainerID(),
Name: "baz",
Hash: 0x1234,
Status: kubecontainer.ContainerStatusUnknown,
},
},
},
@ -663,6 +669,7 @@ func TestFindContainersByPod(t *testing.T) {
ID: kubetypes.DockerID("barbar").ContainerID(),
Name: "barbar",
Hash: 0x1234,
Status: kubecontainer.ContainerStatusUnknown,
},
},
},
@ -675,6 +682,7 @@ func TestFindContainersByPod(t *testing.T) {
ID: kubetypes.DockerID("bazbaz").ContainerID(),
Name: "bazbaz",
Hash: 0x1234,
Status: kubecontainer.ContainerStatusUnknown,
},
},
},

View File

@ -84,7 +84,7 @@ func TestDetectImagesInitialDetect(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.PodList = []*container.Pod{
fakeRuntime.AllPodList = []*container.Pod{
{
Containers: []*container.Container{
makeContainer(1),
@ -114,7 +114,7 @@ func TestDetectImagesWithNewImage(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.PodList = []*container.Pod{
fakeRuntime.AllPodList = []*container.Pod{
{
Containers: []*container.Container{
makeContainer(1),
@ -159,7 +159,7 @@ func TestDetectImagesContainerStopped(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.PodList = []*container.Pod{
fakeRuntime.AllPodList = []*container.Pod{
{
Containers: []*container.Container{
makeContainer(1),
@ -175,7 +175,7 @@ func TestDetectImagesContainerStopped(t *testing.T) {
require.True(t, ok)
// Simulate container being stopped.
fakeRuntime.PodList = []*container.Pod{}
fakeRuntime.AllPodList = []*container.Pod{}
err = manager.detectImages(time.Now())
require.NoError(t, err)
assert.Equal(manager.imageRecordsLen(), 2)
@ -195,7 +195,7 @@ func TestDetectImagesWithRemovedImages(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.PodList = []*container.Pod{
fakeRuntime.AllPodList = []*container.Pod{
{
Containers: []*container.Container{
makeContainer(1),
@ -221,7 +221,7 @@ func TestFreeSpaceImagesInUseContainersAreIgnored(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.PodList = []*container.Pod{
fakeRuntime.AllPodList = []*container.Pod{
{
Containers: []*container.Container{
makeContainer(1),
@ -242,7 +242,7 @@ func TestFreeSpaceRemoveByLeastRecentlyUsed(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.PodList = []*container.Pod{
fakeRuntime.AllPodList = []*container.Pod{
{
Containers: []*container.Container{
makeContainer(0),
@ -253,7 +253,7 @@ func TestFreeSpaceRemoveByLeastRecentlyUsed(t *testing.T) {
// Make 1 be more recently used than 0.
require.NoError(t, manager.detectImages(zero))
fakeRuntime.PodList = []*container.Pod{
fakeRuntime.AllPodList = []*container.Pod{
{
Containers: []*container.Container{
makeContainer(1),
@ -261,7 +261,7 @@ func TestFreeSpaceRemoveByLeastRecentlyUsed(t *testing.T) {
},
}
require.NoError(t, manager.detectImages(time.Now()))
fakeRuntime.PodList = []*container.Pod{
fakeRuntime.AllPodList = []*container.Pod{
{
Containers: []*container.Container{},
},
@ -281,7 +281,7 @@ func TestFreeSpaceTiesBrokenByDetectedTime(t *testing.T) {
fakeRuntime.ImageList = []container.Image{
makeImage(0, 1024),
}
fakeRuntime.PodList = []*container.Pod{
fakeRuntime.AllPodList = []*container.Pod{
{
Containers: []*container.Container{
makeContainer(0),
@ -296,7 +296,7 @@ func TestFreeSpaceTiesBrokenByDetectedTime(t *testing.T) {
makeImage(1, 2048),
}
require.NoError(t, manager.detectImages(time.Now()))
fakeRuntime.PodList = []*container.Pod{}
fakeRuntime.AllPodList = []*container.Pod{}
require.NoError(t, manager.detectImages(time.Now()))
require.Equal(t, manager.imageRecordsLen(), 2)
@ -317,7 +317,7 @@ func TestFreeSpaceImagesAlsoDoesLookupByRepoTags(t *testing.T) {
Size: 2048,
},
}
fakeRuntime.PodList = []*container.Pod{
fakeRuntime.AllPodList = []*container.Pod{
{
Containers: []*container.Container{
{

View File

@ -55,6 +55,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/pleg"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/prober"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
@ -111,6 +112,19 @@ const (
housekeepingPeriod = time.Second * 2
etcHostsPath = "/etc/hosts"
// Capacity of the channel for recieving pod lifecycle events. This number
// is a bit arbitrary and may be adjusted in the future.
plegChannelCapacity = 1000
// Generic PLEG relies on relisting for discovering container events.
// The period directly affects the response time of kubelet.
plegRelistPeriod = time.Second * 3
// backOffPeriod is the period to back off when pod syncing resulting in an
// error. It is also used as the base period for the exponential backoff
// container restarts and image pulls.
backOffPeriod = time.Second * 10
)
var (
@ -351,6 +365,7 @@ func NewMainKubelet(
serializeImagePulls,
)
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod)
case "rkt":
conf := &rkt.Config{
Path: rktPath,
@ -372,6 +387,7 @@ func NewMainKubelet(
}
klet.containerRuntime = rktRuntime
klet.imageManager = rkt.NewImageManager(rktRuntime)
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod)
// No Docker daemon to put in a container.
dockerDaemonContainer = ""
@ -425,11 +441,9 @@ func NewMainKubelet(
}
klet.runtimeCache = runtimeCache
klet.workQueue = queue.NewBasicWorkQueue()
// TODO(yujuhong): backoff and resync interval should be set differently
// once we switch to using pod event generator.
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, klet.resyncInterval)
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod)
klet.backOff = util.NewBackOff(resyncInterval, MaxContainerBackOff)
klet.backOff = util.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity)
klet.sourcesSeen = sets.NewString()
return klet, nil
@ -558,6 +572,9 @@ type Kubelet struct {
// as it takes time to gather all necessary node information.
nodeStatusUpdateFrequency time.Duration
// Generates pod events.
pleg pleg.PodLifecycleEventGenerator
// The name of the resource-only container to run the Kubelet in (empty for no container).
// Name must be absolute.
resourceContainer string
@ -871,6 +888,8 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// Run the system oom watcher forever.
kl.statusManager.Start()
// Start the pod lifecycle event generator.
kl.pleg.Start()
kl.syncLoop(updates, kl)
}
@ -2124,20 +2143,21 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand
// sync interval is defaulted to 10s.
syncTicker := time.NewTicker(time.Second)
housekeepingTicker := time.NewTicker(housekeepingPeriod)
plegCh := kl.pleg.Watch()
for {
if rs := kl.runtimeState.errors(); len(rs) != 0 {
glog.Infof("skipping pod synchronization - %v", rs)
time.Sleep(5 * time.Second)
continue
}
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C) {
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
}
}
func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time) bool {
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
kl.syncLoopMonitor.Store(time.Now())
select {
case u, open := <-updates:
@ -2146,6 +2166,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
return false
}
kl.addSource(u.Source)
switch u.Op {
case kubetypes.ADD:
glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, kubeletutil.FormatPodNames(u.Pods))
@ -2160,6 +2181,25 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
// TODO: Do we want to support this?
glog.Errorf("Kubelet does not support snapshot update")
}
case e := <-plegCh:
// Filter out started events since we don't use them now.
if e.Type == pleg.ContainerStarted {
break
}
pod, ok := kl.podManager.GetPodByUID(e.ID)
if !ok {
// If the pod no longer exists, ignore the event.
glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
break
}
glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", kubeletutil.FormatPodName(pod), e)
// Force the container runtime cache to update.
if err := kl.runtimeCache.ForceUpdateIfOlder(time.Now()); err != nil {
glog.Errorf("SyncLoop: unable to update runtime cache")
// TODO (yujuhong): should we delay the sync until container
// runtime can be updated?
}
handler.HandlePodSyncs([]*api.Pod{pod})
case <-syncCh:
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {

View File

@ -46,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/container"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/pleg"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/prober"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
@ -147,6 +148,8 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20)
kubelet.resyncInterval = 10 * time.Second
kubelet.workQueue = queue.NewBasicWorkQueue()
// Relist period does not affect the tests.
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour)
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient}
}
@ -338,15 +341,16 @@ func TestSyncLoopTimeUpdate(t *testing.T) {
// Start sync ticker.
syncCh := make(chan time.Time, 1)
housekeepingCh := make(chan time.Time, 1)
plegCh := make(chan *pleg.PodLifecycleEvent)
syncCh <- time.Now()
kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh)
kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh, plegCh)
loopTime2 := kubelet.LatestLoopEntryTime()
if loopTime2.IsZero() {
t.Errorf("Unexpected sync loop time: 0, expected non-zero value.")
}
syncCh <- time.Now()
kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh)
kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh, plegCh)
loopTime3 := kubelet.LatestLoopEntryTime()
if !loopTime3.After(loopTime1) {
t.Errorf("Sync Loop Time was not updated correctly. Second update timestamp should be greater than first update timestamp")
@ -366,7 +370,7 @@ func TestSyncLoopAbort(t *testing.T) {
close(ch)
// sanity check (also prevent this test from hanging in the next step)
ok := kubelet.syncLoopIteration(ch, kubelet, make(chan time.Time), make(chan time.Time))
ok := kubelet.syncLoopIteration(ch, kubelet, make(chan time.Time), make(chan time.Time), make(chan *pleg.PodLifecycleEvent, 1))
if ok {
t.Fatalf("expected syncLoopIteration to return !ok since update chan was closed")
}

19
pkg/kubelet/pleg/doc.go Normal file
View File

@ -0,0 +1,19 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package pleg contains types and a generic implementation of the pod
// lifecycle event generator.
package pleg

141
pkg/kubelet/pleg/generic.go Normal file
View File

@ -0,0 +1,141 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pleg
import (
"fmt"
"time"
"github.com/golang/glog"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
)
// GenericPLEG is an extremely simple generic PLEG that relies solely on
// periodic listing to discover container changes. It should be be used
// as temporary replacement for container runtimes do not support a proper
// event generator yet.
//
// Note that GenericPLEG assumes that a container would not be created,
// terminated, and garbage collected within one relist period. If such an
// incident happens, GenenricPLEG would miss all events regarding this
// container. In the case of relisting failure, the window may become longer.
// Note that this assumption is not unique -- many kubelet internal components
// rely on terminated containers as tombstones for bookkeeping purposes. The
// garbage collector is implemented to work with such situtations. However, to
// guarantee that kubelet can handle missing container events, it is
// recommended to set the relist period short and have an auxiliary, longer
// periodic sync in kubelet as the safety net.
type GenericPLEG struct {
// The period for relisting.
relistPeriod time.Duration
// The container runtime.
runtime kubecontainer.Runtime
// The channel from which the subscriber listens events.
eventChannel chan *PodLifecycleEvent
// The internal cache for container information.
containers map[string]containerInfo
}
type containerInfo struct {
podID types.UID
status kubecontainer.ContainerStatus
}
func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
relistPeriod time.Duration) PodLifecycleEventGenerator {
return &GenericPLEG{
relistPeriod: relistPeriod,
runtime: runtime,
eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
containers: make(map[string]containerInfo),
}
}
// Returns a channel from which the subscriber can recieve PodLifecycleEvent
// events.
// TODO: support multiple subscribers.
func (g *GenericPLEG) Watch() chan *PodLifecycleEvent {
return g.eventChannel
}
// Start spawns a goroutine to relist periodically.
func (g *GenericPLEG) Start() {
go util.Until(g.relist, g.relistPeriod, util.NeverStop)
}
func generateEvent(podID types.UID, cid string, oldStatus, newStatus kubecontainer.ContainerStatus) *PodLifecycleEvent {
if newStatus == oldStatus {
return nil
}
switch newStatus {
case kubecontainer.ContainerStatusRunning:
return &PodLifecycleEvent{ID: podID, Type: ContainerStarted, Data: cid}
case kubecontainer.ContainerStatusExited:
return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid}
case kubecontainer.ContainerStatusUnknown:
// Don't generate any event if the status is unknown.
return nil
default:
panic(fmt.Sprintf("unrecognized container status: %v", newStatus))
}
return nil
}
// relist queries the container runtime for list of pods/containers, compare
// with the internal pods/containers, and generats events accordingly.
func (g *GenericPLEG) relist() {
glog.V(5).Infof("GenericPLEG: Relisting")
// Get all the pods.
pods, err := g.runtime.GetPods(true)
if err != nil {
glog.Errorf("GenericPLEG: Unable to retrieve pods: %v", err)
return
}
events := []*PodLifecycleEvent{}
containers := make(map[string]containerInfo, len(g.containers))
// Create a new containers map, compares container statuses, and generates
// correspoinding events.
for _, p := range pods {
for _, c := range p.Containers {
cid := c.ID.ID
// Get the of existing container info. Defaults to status unknown.
oldStatus := kubecontainer.ContainerStatusUnknown
if info, ok := g.containers[cid]; ok {
oldStatus = info.status
}
// Generate an event if required.
glog.V(7).Infof("GenericPLEG: %v/%v: %v -> %v", p.ID, cid, oldStatus, c.Status)
if e := generateEvent(p.ID, cid, oldStatus, c.Status); e != nil {
events = append(events, e)
}
// Write to the new cache.
containers[cid] = containerInfo{podID: p.ID, status: c.Status}
}
}
// Swap the container info cache. This is purely to avoid the need of
// garbage collection.
g.containers = containers
// Send out the events.
for i := range events {
g.eventChannel <- events[i]
}
}

View File

@ -0,0 +1,148 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pleg
import (
"reflect"
"sort"
"testing"
"time"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util"
)
const (
testContainerRuntimeType = "fooRuntime"
)
type TestGenericPLEG struct {
pleg *GenericPLEG
runtime *kubecontainer.FakeRuntime
}
func newTestGenericPLEG() *TestGenericPLEG {
fakeRuntime := &kubecontainer.FakeRuntime{}
// The channel capacity should be large enough to hold all events in a
// single test.
pleg := &GenericPLEG{
relistPeriod: time.Hour,
runtime: fakeRuntime,
eventChannel: make(chan *PodLifecycleEvent, 100),
containers: make(map[string]containerInfo),
}
return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime}
}
func getEventsFromChannel(ch <-chan *PodLifecycleEvent) []*PodLifecycleEvent {
events := []*PodLifecycleEvent{}
for len(ch) > 0 {
e := <-ch
events = append(events, e)
}
return events
}
func createTestContainer(ID string, status kubecontainer.ContainerStatus) *kubecontainer.Container {
return &kubecontainer.Container{
ID: kubecontainer.ContainerID{Type: testContainerRuntimeType, ID: ID},
Status: status,
}
}
type sortableEvents []*PodLifecycleEvent
func (a sortableEvents) Len() int { return len(a) }
func (a sortableEvents) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a sortableEvents) Less(i, j int) bool {
if a[i].ID != a[j].ID {
return a[i].ID < a[j].ID
}
return a[i].Data.(string) < a[j].Data.(string)
}
func verifyEvents(t *testing.T, expected, actual []*PodLifecycleEvent) {
sort.Sort(sortableEvents(expected))
sort.Sort(sortableEvents(actual))
if !reflect.DeepEqual(expected, actual) {
t.Errorf("Actual events differ from the expected; diff: %v", util.ObjectDiff(expected, actual))
}
}
func TestRelisting(t *testing.T) {
testPleg := newTestGenericPLEG()
pleg, runtime := testPleg.pleg, testPleg.runtime
ch := pleg.Watch()
// The first relist should send a PodSync event to each pod.
runtime.AllPodList = []*kubecontainer.Pod{
{
ID: "1234",
Containers: []*kubecontainer.Container{
createTestContainer("c1", kubecontainer.ContainerStatusExited),
createTestContainer("c2", kubecontainer.ContainerStatusRunning),
createTestContainer("c3", kubecontainer.ContainerStatusUnknown),
},
},
{
ID: "4567",
Containers: []*kubecontainer.Container{
createTestContainer("c1", kubecontainer.ContainerStatusExited),
},
},
}
pleg.relist()
// Report every running/exited container if we see them for the first time.
expected := []*PodLifecycleEvent{
{ID: "1234", Type: ContainerStarted, Data: "c2"},
{ID: "4567", Type: ContainerDied, Data: "c1"},
{ID: "1234", Type: ContainerDied, Data: "c1"},
}
actual := getEventsFromChannel(ch)
verifyEvents(t, expected, actual)
// The second relist should not send out any event because no container
// changed.
pleg.relist()
verifyEvents(t, expected, actual)
runtime.AllPodList = []*kubecontainer.Pod{
{
ID: "1234",
Containers: []*kubecontainer.Container{
createTestContainer("c2", kubecontainer.ContainerStatusExited),
createTestContainer("c3", kubecontainer.ContainerStatusRunning),
},
},
{
ID: "4567",
Containers: []*kubecontainer.Container{
createTestContainer("c4", kubecontainer.ContainerStatusRunning),
},
},
}
pleg.relist()
// Only report containers that transitioned to running or exited status.
expected = []*PodLifecycleEvent{
{ID: "1234", Type: ContainerDied, Data: "c2"},
{ID: "1234", Type: ContainerStarted, Data: "c3"},
{ID: "4567", Type: ContainerStarted, Data: "c4"},
}
actual = getEventsFromChannel(ch)
verifyEvents(t, expected, actual)
}

50
pkg/kubelet/pleg/pleg.go Normal file
View File

@ -0,0 +1,50 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pleg
import (
"k8s.io/kubernetes/pkg/types"
)
type PodLifeCycleEventType string
const (
ContainerStarted PodLifeCycleEventType = "ContainerStarted"
ContainerDied PodLifeCycleEventType = "ContainerDied"
NetworkSetupCompleted PodLifeCycleEventType = "NetworkSetupCompleted"
NetworkFailed PodLifeCycleEventType = "NetworkFailed"
// PodSync is used to trigger syncing of a pod when the observed change of
// the state of the pod cannot be captured by any single event above.
PodSync PodLifeCycleEventType = "PodSync"
)
// PodLifecycleEvent is an event that reflects the change of the pod state.
type PodLifecycleEvent struct {
// The pod ID.
ID types.UID
// The type of the event.
Type PodLifeCycleEventType
// The accompanied data which varies based on the event type.
// - ContainerStarted/ContainerStopped: the container name (string).
// - All other event types: unused.
Data interface{}
}
type PodLifecycleEventGenerator interface {
Start()
Watch() chan *PodLifecycleEvent
}

View File

@ -171,15 +171,8 @@ func (pm *basicManager) getAllPods() []*api.Pod {
return append(podsMapToPods(pm.podByUID), podsMapToPods(pm.mirrorPodByUID)...)
}
// GetPodByName provides the (non-mirror) pod that matches namespace and name,
// as well as whether the pod was found.
func (pm *basicManager) GetPodByName(namespace, name string) (*api.Pod, bool) {
podFullName := kubecontainer.BuildPodFullName(name, namespace)
return pm.GetPodByFullName(podFullName)
}
// GetPodByUID provides the (non-mirror) pod that matches pod UID as well as
// whether the pod was found.
// GetPodByUID provides the (non-mirror) pod that matches pod UID, as well as
// whether the pod is found.
func (pm *basicManager) GetPodByUID(uid types.UID) (*api.Pod, bool) {
pm.lock.RLock()
defer pm.lock.RUnlock()
@ -187,6 +180,13 @@ func (pm *basicManager) GetPodByUID(uid types.UID) (*api.Pod, bool) {
return pod, ok
}
// GetPodByName provides the (non-mirror) pod that matches namespace and name,
// as well as whether the pod was found.
func (pm *basicManager) GetPodByName(namespace, name string) (*api.Pod, bool) {
podFullName := kubecontainer.BuildPodFullName(name, namespace)
return pm.GetPodByFullName(podFullName)
}
// GetPodByName returns the (non-mirror) pod that matches full name, as well as
// whether the pod was found.
func (pm *basicManager) GetPodByFullName(podFullName string) (*api.Pod, bool) {

View File

@ -793,7 +793,14 @@ func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
var pods []*kubecontainer.Pod
for _, u := range units {
if strings.HasPrefix(u.Name, kubernetesUnitPrefix) {
if !all && u.SubState != "running" {
var status kubecontainer.ContainerStatus
switch {
case u.SubState == "running":
status = kubecontainer.ContainerStatusRunning
default:
status = kubecontainer.ContainerStatusExited
}
if !all && status != kubecontainer.ContainerStatusRunning {
continue
}
pod, _, err := r.readServiceFile(u.Name)