Revert "Adding sync pod latency metric." and "Thread-per-pod model in Kubelet."

This reverts commits 744f33d886 and
7191c5c7fd.
This commit is contained in:
Jeff Grafton 2015-02-26 12:14:48 -08:00
parent e455ee5d2e
commit c2e7e2d029
4 changed files with 139 additions and 291 deletions

View File

@ -61,10 +61,7 @@ const podOomScoreAdj = -100
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occuring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, startTime time.Time) error
SyncPods([]api.BoundPod) error
}
type SourceReadyFn func(source string) bool
@ -114,6 +111,7 @@ func NewMainKubelet(
rootDirectory: rootDirectory,
resyncInterval: resyncInterval,
podInfraContainerImage: podInfraContainerImage,
podWorkers: newPodWorkers(),
dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{},
runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
httpClient: &http.Client{},
@ -136,7 +134,6 @@ func NewMainKubelet(
return nil, err
}
klet.dockerCache = dockerCache
klet.podWorkers = newPodWorkers(dockerCache, klet.syncPod)
metrics.Register(dockerCache)
@ -456,6 +453,43 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
kl.syncLoop(updates, kl)
}
// Per-pod workers.
type podWorkers struct {
lock sync.Mutex
// Set of pods with existing workers.
workers util.StringSet
}
func newPodWorkers() *podWorkers {
return &podWorkers{
workers: util.NewStringSet(),
}
}
// Runs a worker for "podFullName" asynchronously with the specified "action".
// If the worker for the "podFullName" is already running, functions as a no-op.
func (self *podWorkers) Run(podFullName string, action func()) {
self.lock.Lock()
defer self.lock.Unlock()
// This worker is already running, let it finish.
if self.workers.Has(podFullName) {
return
}
self.workers.Insert(podFullName)
// Run worker async.
go func() {
defer util.HandleCrash()
action()
self.lock.Lock()
defer self.lock.Unlock()
self.workers.Delete(podFullName)
}()
}
func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string {
binds := []string{}
for _, mount := range container.VolumeMounts {
@ -945,7 +979,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.BoundPod) (dockertools.Docke
func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error {
start := time.Now()
defer func() {
metrics.ImagePullLatency.Observe(metrics.SinceInMicroseconds(start))
metrics.ImagePullLatency.Observe(float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()))
}()
if err := kl.dockerPuller.Pull(img); err != nil {
@ -1273,7 +1307,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.BoundPod, running []*docker
}
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error {
func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
glog.V(4).Infof("Desired: %#v", pods)
var err error
desiredContainers := make(map[podContainer]empty)
@ -1299,14 +1333,13 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metr
}
// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(pod, func() {
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
kl.podWorkers.Run(podFullName, func() {
if err := kl.syncPod(pod, dockerContainers); err != nil {
glog.Errorf("Error syncing pod, skipping: %v", err)
record.Eventf(pod, "failedSync", "Error syncing pod, skipping: %v", err)
}
})
}
// Stop the workers for no-longer existing pods.
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
// Kill any containers we don't need.
killed := []string{}
for ix := range dockerContainers {
@ -1421,21 +1454,19 @@ func (kl *Kubelet) handleUpdate(u PodUpdate) {
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
for {
unsyncedPod := false
podSyncTypes := make(map[types.UID]metrics.SyncPodType)
select {
case u := <-updates:
kl.updatePods(u, podSyncTypes)
kl.updatePods(u)
unsyncedPod = true
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
}
start := time.Now()
// If we already caught some update, try to wait for some short time
// to possibly batch it with other incoming updates.
for unsyncedPod {
select {
case u := <-updates:
kl.updatePods(u, podSyncTypes)
kl.updatePods(u)
case <-time.After(5 * time.Millisecond):
// Break the for loop.
unsyncedPod = false
@ -1447,54 +1478,25 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
glog.Errorf("Failed to get bound pods.")
return
}
if err := handler.SyncPods(pods, podSyncTypes, start); err != nil {
if err := handler.SyncPods(pods); err != nil {
glog.Errorf("Couldn't sync containers: %v", err)
}
}
}
// Updated the Kubelet's internal pods with those provided by the update.
// Records new and updated pods in newPods and updatedPods.
func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
func (kl *Kubelet) updatePods(u PodUpdate) {
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")
// Store the new pods. Don't worry about filtering host ports since those
// pods will never be looked up.
existingPods := make(map[types.UID]struct{})
for i := range kl.pods {
existingPods[kl.pods[i].UID] = struct{}{}
}
for i := range u.Pods {
if _, ok := existingPods[u.Pods[i].UID]; !ok {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodCreate
}
}
kl.pods = u.Pods
kl.pods = filterHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
// Store the updated pods. Don't worry about filtering host ports since those
// pods will never be looked up.
for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
}
kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = filterHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
}
// Mark all remaining pods as sync.
for i := range kl.pods {
if _, ok := podSyncTypes[kl.pods[i].UID]; !ok {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync
}
}
}
// Returns Docker version for this Kubelet.

View File

@ -34,7 +34,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/host_path"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
@ -49,15 +48,14 @@ func init() {
util.ReallyCrash = true
}
func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *sync.WaitGroup) {
func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient) {
fakeDocker := &dockertools.FakeDockerClient{
RemovedImages: util.StringSet{},
}
fakeDockerCache := dockertools.NewFakeDockerCache(fakeDocker)
kubelet := &Kubelet{}
kubelet.dockerClient = fakeDocker
kubelet.dockerCache = fakeDockerCache
kubelet.dockerCache = dockertools.NewFakeDockerCache(fakeDocker)
kubelet.dockerPuller = &dockertools.FakeDockerPuller{}
if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil {
t.Fatalf("can't make a temp rootdir: %v", err)
@ -67,14 +65,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn
if err := os.MkdirAll(kubelet.rootDirectory, 0750); err != nil {
t.Fatalf("can't mkdir(%q): %v", kubelet.rootDirectory, err)
}
waitGroup := new(sync.WaitGroup)
kubelet.podWorkers = newPodWorkers(
fakeDockerCache,
func(pod *api.BoundPod, containers dockertools.DockerContainers) error {
err := kubelet.syncPod(pod, containers)
waitGroup.Done()
return err
})
kubelet.podWorkers = newPodWorkers()
kubelet.sourceReady = func(source string) bool { return true }
kubelet.masterServiceNamespace = api.NamespaceDefault
kubelet.serviceLister = testServiceLister{}
@ -83,7 +74,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn
t.Fatalf("can't initialize kubelet data dirs: %v", err)
}
return kubelet, fakeDocker, waitGroup
return kubelet, fakeDocker
}
func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) {
@ -135,7 +126,7 @@ func verifyBoolean(t *testing.T, expected, value bool) {
}
func TestKubeletDirs(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
kubelet, _ := newTestKubelet(t)
root := kubelet.rootDirectory
var exp, got string
@ -196,7 +187,7 @@ func TestKubeletDirs(t *testing.T) {
}
func TestKubeletDirsCompat(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
kubelet, _ := newTestKubelet(t)
root := kubelet.rootDirectory
if err := os.MkdirAll(root, 0750); err != nil {
t.Fatalf("can't mkdir(%q): %s", root, err)
@ -302,7 +293,7 @@ func TestKillContainerWithError(t *testing.T) {
Err: fmt.Errorf("sample error"),
ContainerList: append([]docker.APIContainers{}, containers...),
}
kubelet, _, _ := newTestKubelet(t)
kubelet, _ := newTestKubelet(t)
for _, c := range fakeDocker.ContainerList {
kubelet.readiness.set(c.ID, true)
}
@ -333,7 +324,7 @@ func TestKillContainer(t *testing.T) {
Names: []string{"/k8s_bar_qux_5678_42"},
},
}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
fakeDocker.ContainerList = append([]docker.APIContainers{}, containers...)
fakeDocker.Container = &docker.Container{
Name: "foobar",
@ -383,10 +374,8 @@ func (cr *channelReader) GetList() [][]api.BoundPod {
return cr.list
}
var emptyPodUIDs map[types.UID]metrics.SyncPodType
func TestSyncPodsDoesNothing(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
container := api.Container{Name: "bar"}
fakeDocker.ContainerList = []docker.APIContainers{
{
@ -415,17 +404,16 @@ func TestSyncPodsDoesNothing(t *testing.T) {
},
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
err := kubelet.SyncPods(kubelet.pods)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container"})
kubelet.drainWorkers()
verifyCalls(t, fakeDocker, []string{"list", "list", "inspect_container", "inspect_container"})
}
func TestSyncPodsWithTerminationLog(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
container := api.Container{
Name: "bar",
TerminationMessagePath: "/dev/somepath",
@ -446,14 +434,13 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
},
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
err := kubelet.SyncPods(kubelet.pods)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
waitGroup.Wait()
kubelet.drainWorkers()
verifyCalls(t, fakeDocker, []string{
"list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"})
"list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"})
fakeDocker.Lock()
parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":")
@ -466,6 +453,19 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
fakeDocker.Unlock()
}
// drainWorkers waits until all workers are done. Should only used for testing.
func (kl *Kubelet) drainWorkers() {
for {
kl.podWorkers.lock.Lock()
length := len(kl.podWorkers.workers)
kl.podWorkers.lock.Unlock()
if length == 0 {
return
}
time.Sleep(time.Millisecond * 100)
}
}
func matchString(t *testing.T, pattern, str string) bool {
match, err := regexp.MatchString(pattern, str)
if err != nil {
@ -475,7 +475,7 @@ func matchString(t *testing.T, pattern, str string) bool {
}
func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
kubelet.podInfraContainerImage = "custom_image_name"
fakeDocker.ContainerList = []docker.APIContainers{}
kubelet.pods = []api.BoundPod{
@ -493,15 +493,14 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
},
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
err := kubelet.SyncPods(kubelet.pods)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
waitGroup.Wait()
kubelet.drainWorkers()
verifyCalls(t, fakeDocker, []string{
"list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"})
"list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"})
fakeDocker.Lock()
@ -524,7 +523,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
}
func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller)
puller.HasImages = []string{}
kubelet.podInfraContainerImage = "custom_image_name"
@ -544,15 +543,14 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
},
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
err := kubelet.SyncPods(kubelet.pods)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
waitGroup.Wait()
kubelet.drainWorkers()
verifyCalls(t, fakeDocker, []string{
"list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"})
"list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"})
fakeDocker.Lock()
@ -569,7 +567,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
}
func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{
{
// pod infra container
@ -592,15 +590,14 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
},
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
err := kubelet.SyncPods(kubelet.pods)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
waitGroup.Wait()
kubelet.drainWorkers()
verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"})
"list", "list", "inspect_container", "inspect_image", "list", "create", "start"})
fakeDocker.Lock()
if len(fakeDocker.Created) != 1 ||
@ -611,7 +608,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
}
func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
fakeHttp := fakeHTTP{}
kubelet.httpClient = &fakeHttp
fakeDocker.ContainerList = []docker.APIContainers{
@ -647,15 +644,14 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
},
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
err := kubelet.SyncPods(kubelet.pods)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
waitGroup.Wait()
kubelet.drainWorkers()
verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"})
"list", "list", "inspect_container", "inspect_image", "list", "create", "start"})
fakeDocker.Lock()
if len(fakeDocker.Created) != 1 ||
@ -669,7 +665,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
}
func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{
{
// format is // k8s_<container-id>_<pod-fullname>_<pod-uid>
@ -692,15 +688,14 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
},
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
err := kubelet.SyncPods(kubelet.pods)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
waitGroup.Wait()
kubelet.drainWorkers()
verifyCalls(t, fakeDocker, []string{
"list", "list", "stop", "create", "start", "inspect_container", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"})
"list", "stop", "create", "start", "inspect_container", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"})
// A map iteration is used to delete containers, so must not depend on
// order here.
@ -716,7 +711,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ready := false
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
kubelet.sourceReady = func(source string) bool { return ready }
fakeDocker.ContainerList = []docker.APIContainers{
@ -731,7 +726,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
@ -739,7 +734,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
fakeDocker.ClearCalls()
ready = true
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
@ -759,7 +754,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) {
ready := false
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
kubelet.sourceReady = func(source string) bool {
if source == "testSource" {
return ready
@ -790,7 +785,7 @@ func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) {
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
@ -798,7 +793,7 @@ func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) {
fakeDocker.ClearCalls()
ready = true
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
@ -819,7 +814,7 @@ func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) {
}
func TestSyncPodsDeletes(t *testing.T) {
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{
{
// the k8s prefix is required for the kubelet to manage the container
@ -836,7 +831,7 @@ func TestSyncPodsDeletes(t *testing.T) {
ID: "4567",
},
}
err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now())
err := kubelet.SyncPods([]api.BoundPod{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -857,7 +852,7 @@ func TestSyncPodsDeletes(t *testing.T) {
}
func TestSyncPodDeletesDuplicate(t *testing.T) {
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
dockerContainers := dockertools.DockerContainers{
"1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container
@ -907,7 +902,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
}
func TestSyncPodBadHash(t *testing.T) {
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
dockerContainers := dockertools.DockerContainers{
"1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container
@ -956,7 +951,7 @@ func TestSyncPodBadHash(t *testing.T) {
}
func TestSyncPodUnhealthy(t *testing.T) {
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
dockerContainers := dockertools.DockerContainers{
"1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container
@ -1006,7 +1001,7 @@ func TestSyncPodUnhealthy(t *testing.T) {
}
func TestMountExternalVolumes(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
kubelet, _ := newTestKubelet(t)
kubelet.volumePluginMgr.InitPlugins([]volume.Plugin{&volume.FakePlugin{"fake", nil}}, &volumeHost{kubelet})
pod := api.BoundPod{
@ -1040,7 +1035,7 @@ func TestMountExternalVolumes(t *testing.T) {
}
func TestGetPodVolumesFromDisk(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
kubelet, _ := newTestKubelet(t)
plug := &volume.FakePlugin{"fake", nil}
kubelet.volumePluginMgr.InitPlugins([]volume.Plugin{plug}, &volumeHost{kubelet})
@ -1315,7 +1310,7 @@ func TestGetContainerInfo(t *testing.T) {
cadvisorReq := &info.ContainerInfoRequest{}
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil)
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor
fakeDocker.ContainerList = []docker.APIContainers{
{
@ -1353,6 +1348,7 @@ func TestGetRootInfo(t *testing.T) {
dockerClient: &fakeDocker,
dockerPuller: &dockertools.FakeDockerPuller{},
cadvisorClient: mockCadvisor,
podWorkers: newPodWorkers(),
}
// If the container name is an empty string, then it means the root container.
@ -1364,7 +1360,7 @@ func TestGetRootInfo(t *testing.T) {
}
func TestGetContainerInfoWithoutCadvisor(t *testing.T) {
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{
{
ID: "foobar",
@ -1389,7 +1385,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) {
cadvisorReq := &info.ContainerInfoRequest{}
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, ErrCadvisorApiFailure)
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor
fakeDocker.ContainerList = []docker.APIContainers{
{
@ -1417,7 +1413,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) {
func TestGetContainerInfoOnNonExistContainer(t *testing.T) {
mockCadvisor := &mockCadvisorClient{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor
fakeDocker.ContainerList = []docker.APIContainers{}
@ -1431,7 +1427,7 @@ func TestGetContainerInfoOnNonExistContainer(t *testing.T) {
func TestGetContainerInfoWhenDockerToolsFailed(t *testing.T) {
mockCadvisor := &mockCadvisorClient{}
kubelet, _, _ := newTestKubelet(t)
kubelet, _ := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor
expectedErr := fmt.Errorf("List containers error")
kubelet.dockerClient = &errorTestingDockerClient{listContainersError: expectedErr}
@ -1451,7 +1447,7 @@ func TestGetContainerInfoWhenDockerToolsFailed(t *testing.T) {
func TestGetContainerInfoWithNoContainers(t *testing.T) {
mockCadvisor := &mockCadvisorClient{}
kubelet, _, _ := newTestKubelet(t)
kubelet, _ := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor
kubelet.dockerClient = &errorTestingDockerClient{listContainersError: nil}
@ -1470,7 +1466,7 @@ func TestGetContainerInfoWithNoContainers(t *testing.T) {
func TestGetContainerInfoWithNoMatchingContainers(t *testing.T) {
mockCadvisor := &mockCadvisorClient{}
kubelet, _, _ := newTestKubelet(t)
kubelet, _ := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor
containerList := []docker.APIContainers{
@ -1534,7 +1530,7 @@ func (f *fakeContainerCommandRunner) PortForward(podInfraContainerID string, por
func TestRunInContainerNoSuchPod(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{}
kubelet.runner = &fakeCommandRunner
@ -1556,7 +1552,7 @@ func TestRunInContainerNoSuchPod(t *testing.T) {
func TestRunInContainer(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner
containerID := "abc1234"
@ -1597,7 +1593,7 @@ func TestRunInContainer(t *testing.T) {
func TestRunHandlerExec(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner
containerID := "abc1234"
@ -1645,7 +1641,7 @@ func (f *fakeHTTP) Get(url string) (*http.Response, error) {
func TestRunHandlerHttp(t *testing.T) {
fakeHttp := fakeHTTP{}
kubelet, _, _ := newTestKubelet(t)
kubelet, _ := newTestKubelet(t)
kubelet.httpClient = &fakeHttp
podName := "podFoo"
@ -1674,7 +1670,7 @@ func TestRunHandlerHttp(t *testing.T) {
}
func TestNewHandler(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
kubelet, _ := newTestKubelet(t)
handler := &api.Handler{
HTTPGet: &api.HTTPGetAction{
Host: "foo",
@ -1705,7 +1701,7 @@ func TestNewHandler(t *testing.T) {
}
func TestSyncPodEventHandlerFails(t *testing.T) {
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
kubelet.httpClient = &fakeHTTP{
err: fmt.Errorf("test error"),
}
@ -1894,7 +1890,7 @@ func TestKubeletGarbageCollection(t *testing.T) {
},
}
for _, test := range tests {
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
kubelet.maxContainerCount = 2
fakeDocker.ContainerList = test.containers
fakeDocker.ContainerMap = test.containerDetails
@ -2059,7 +2055,7 @@ func TestPurgeOldest(t *testing.T) {
},
}
for _, test := range tests {
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
kubelet.maxContainerCount = 5
fakeDocker.ContainerMap = test.containerDetails
kubelet.purgeOldest(test.ids)
@ -2070,12 +2066,11 @@ func TestPurgeOldest(t *testing.T) {
}
func TestSyncPodsWithPullPolicy(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller)
puller.HasImages = []string{"existing_one", "want:latest"}
kubelet.podInfraContainerImage = "custom_image_name"
fakeDocker.ContainerList = []docker.APIContainers{}
waitGroup.Add(1)
err := kubelet.SyncPods([]api.BoundPod{
{
ObjectMeta: api.ObjectMeta{
@ -2094,11 +2089,11 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
},
},
},
}, emptyPodUIDs, time.Now())
})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
waitGroup.Wait()
kubelet.drainWorkers()
fakeDocker.Lock()
@ -2404,7 +2399,7 @@ func TestMakeEnvironmentVariables(t *testing.T) {
}
for _, tc := range testCases {
kl, _, _ := newTestKubelet(t)
kl, _ := newTestKubelet(t)
kl.masterServiceNamespace = tc.masterServiceNamespace
if tc.nilLister {
kl.serviceLister = nil
@ -2841,7 +2836,7 @@ func TestGetPodReadyCondition(t *testing.T) {
func TestExecInContainerNoSuchPod(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{}
kubelet.runner = &fakeCommandRunner
@ -2868,7 +2863,7 @@ func TestExecInContainerNoSuchPod(t *testing.T) {
func TestExecInContainerNoSuchContainer(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner
podName := "podFoo"
@ -2921,7 +2916,7 @@ func (f *fakeReadWriteCloser) Close() error {
func TestExecInContainer(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner
podName := "podFoo"
@ -2980,7 +2975,7 @@ func TestExecInContainer(t *testing.T) {
func TestPortForwardNoSuchPod(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{}
kubelet.runner = &fakeCommandRunner
@ -3004,7 +2999,7 @@ func TestPortForwardNoSuchPod(t *testing.T) {
func TestPortForwardNoSuchContainer(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner
podName := "podFoo"
@ -3039,7 +3034,7 @@ func TestPortForwardNoSuchContainer(t *testing.T) {
func TestPortForward(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet, fakeDocker := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner
podName := "podFoo"

View File

@ -18,7 +18,6 @@ package metrics
import (
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
@ -36,16 +35,8 @@ var (
Help: "Image pull latency in microseconds.",
},
)
// TODO(vmarmol): Break down by number of containers in pod?
SyncPodLatency = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Subsystem: kubeletSubsystem,
Name: "sync_pod_latency_microseconds",
Help: "Latency in microseconds to sync a single pod. Broken down by operation type: create, update, or sync",
},
[]string{"operation_type"},
)
// TODO(vmarmol): Containers per pod
// TODO(vmarmol): Latency of pod startup
// TODO(vmarmol): Latency of SyncPods
)
@ -56,37 +47,10 @@ func Register(containerCache dockertools.DockerCache) {
// Register the metrics.
registerMetrics.Do(func() {
prometheus.MustRegister(ImagePullLatency)
prometheus.MustRegister(SyncPodLatency)
prometheus.MustRegister(newPodAndContainerCollector(containerCache))
})
}
type SyncPodType int
const (
SyncPodCreate SyncPodType = iota
SyncPodUpdate
SyncPodSync
)
func (self SyncPodType) String() string {
switch self {
case SyncPodCreate:
return "create"
case SyncPodUpdate:
return "update"
case SyncPodSync:
return "sync"
default:
return "unknown"
}
}
// Gets the time since the specified start in microseconds.
func SinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
}
func newPodAndContainerCollector(containerCache dockertools.DockerCache) *podAndContainerCollector {
return &podAndContainerCollector{
containerCache: containerCache,

View File

@ -1,113 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/golang/glog"
)
type syncPodFunType func(*api.BoundPod, dockertools.DockerContainers) error
// TODO(wojtek-t) Add unit tests for this type.
type podWorkers struct {
// Protects podUpdates field.
podLock sync.Mutex
// Tracks all running per-pod goroutines - per-pod goroutine will be
// processing updates received through its corresponding channel.
podUpdates map[types.UID]chan workUpdate
// DockerCache is used for listing running containers.
dockerCache dockertools.DockerCache
// This function is run to sync the desired stated of pod.
// NOTE: This function has to be thread-safe - it can be called for
// different pods at the same time.
syncPodFun syncPodFunType
}
type workUpdate struct {
// The pod state to reflect.
pod *api.BoundPod
// Function to call when the update is complete.
updateCompleteFun func()
}
func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFun syncPodFunType) *podWorkers {
return &podWorkers{
podUpdates: map[types.UID]chan workUpdate{},
dockerCache: dockerCache,
syncPodFun: syncPodFun,
}
}
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
for newWork := range podUpdates {
// Since we use docker cache, getting current state shouldn't cause
// performance overhead on Docker. Moreover, as long as we run syncPod
// no matter if it changes anything, having an old version of "containers"
// can cause starting eunended containers.
containers, err := p.dockerCache.RunningContainers()
if err != nil {
glog.Errorf("Error listing containers while syncing pod: %v", err)
continue
}
err = p.syncPodFun(newWork.pod, containers)
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
record.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)
continue
}
}
}
// Apply the new setting to the specified pod. updateComplete is called when the update is completed.
func (p *podWorkers) UpdatePod(pod *api.BoundPod, updateComplete func()) {
uid := pod.UID
var podUpdates chan workUpdate
var exists bool
p.podLock.Lock()
defer p.podLock.Unlock()
if podUpdates, exists = p.podUpdates[uid]; !exists {
// TODO(wojtek-t): Adjust the size of the buffer in this channel
podUpdates = make(chan workUpdate, 5)
p.podUpdates[uid] = podUpdates
go p.managePodLoop(podUpdates)
}
podUpdates <- workUpdate{
pod: pod,
updateCompleteFun: updateComplete,
}
}
func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {
p.podLock.Lock()
defer p.podLock.Unlock()
for key, channel := range p.podUpdates {
if _, exists := desiredPods[key]; !exists {
close(channel)
delete(p.podUpdates, key)
}
}
}