Merge pull request #5730 from yujuhong/static_stats

Kubelet: support retrieving stats using UID of mirror pods
This commit is contained in:
Victor Marmol 2015-03-20 17:05:23 -07:00
commit 4d2e7981bb
6 changed files with 240 additions and 47 deletions

View File

@ -95,7 +95,7 @@ type SyncHandler interface {
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occuring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods util.StringSet,
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods mirrorPods,
startTime time.Time) error
}
@ -267,7 +267,7 @@ type Kubelet struct {
// similar to pods, this is not immutable and is protected by the same podLock.
// Note that Kubelet.pods do not contain mirror pods as they are filtered
// out beforehand.
mirrorPods util.StringSet
mirrorPods mirrorPods
// A pod status cache stores statuses for pods (both rejected and synced).
// Note that currently no thread attempts to acquire podStatusesLock while
@ -1488,7 +1488,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont
}
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods util.StringSet, start time.Time) error {
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods mirrorPods, start time.Time) error {
defer func() {
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
}()
@ -1536,7 +1536,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}
// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(pod, kl.mirrorPods.Has(podFullName), func() {
kl.podWorkers.UpdatePod(pod, kl.mirrorPods.HasMirrorPod(uid), func() {
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})
@ -1606,7 +1606,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}
// Remove any orphaned mirror pods.
deleteOrphanedMirrorPods(pods, mirrorPods, kl.mirrorManager)
deleteOrphanedMirrorPods(mirrorPods, kl.mirrorManager)
return err
}
@ -1885,7 +1885,7 @@ func (kl *Kubelet) GetHostname() string {
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
// pod map.
func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet) {
func (kl *Kubelet) GetPods() ([]api.Pod, mirrorPods) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
return append([]api.Pod{}, kl.pods...), kl.mirrorPods
@ -2057,6 +2057,8 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio
// GetPodStatus returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
uid = kl.translatePodUID(uid)
// Check to see if we have a cached version of the status.
cachedPodStatus, found := kl.getPodStatusFromCache(podFullName)
if found {
@ -2119,6 +2121,8 @@ func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
// Run a command in a container, returns the combined stdout, stderr as an array of bytes
func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container string, cmd []string) ([]byte, error) {
uid = kl.translatePodUID(uid)
if kl.runner == nil {
return nil, fmt.Errorf("no runner specified.")
}
@ -2136,6 +2140,8 @@ func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container s
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
uid = kl.translatePodUID(uid)
if kl.runner == nil {
return fmt.Errorf("no runner specified.")
}
@ -2153,6 +2159,8 @@ func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container
// PortForward connects to the pod's port and copies data between the port
// and the stream.
func (kl *Kubelet) PortForward(podFullName string, uid types.UID, port uint16, stream io.ReadWriteCloser) error {
uid = kl.translatePodUID(uid)
if kl.runner == nil {
return fmt.Errorf("no runner specified.")
}
@ -2185,8 +2193,30 @@ func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
return kl.streamingConnectionIdleTimeout
}
// If the UID belongs to a mirror pod, maps it to the UID of its static pod.
// Otherwise, return the original UID. All public-facing functions should
// perform this translation for UIDs because user may provide a mirror pod UID,
// which is not recognized by internal Kubelet functions.
func (kl *Kubelet) translatePodUID(uid types.UID) types.UID {
if uid == "" {
return uid
}
kl.podLock.RLock()
defer kl.podLock.RUnlock()
staticUID, ok := kl.mirrorPods.GetStaticUID(uid)
if ok {
return staticUID
} else {
return uid
}
}
// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
uid = kl.translatePodUID(uid)
dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
return nil, err

View File

@ -447,7 +447,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -481,7 +481,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -531,7 +531,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -585,7 +585,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -636,7 +636,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -694,7 +694,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -764,7 +764,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
},
}
waitGroup.Add(2)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -805,7 +805,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil {
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
@ -813,7 +813,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
fakeDocker.ClearCalls()
ready = true
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil {
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
@ -852,7 +852,7 @@ func TestSyncPodsDeletes(t *testing.T) {
ID: "4567",
},
}
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1731,7 +1731,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
},
},
},
}, emptyPodUIDs, util.NewStringSet(), time.Now())
}, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -2936,7 +2936,7 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
// Sync with empty pods so that the entry in status map will be removed.
kl.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now())
kl.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now())
if len(kl.podStatuses) != 0 {
t.Fatalf("expected length of status map to be 0. Got map %#v.", kl.podStatuses)
}
@ -3183,23 +3183,119 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) {
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kl := testKubelet.kubelet
manager := testKubelet.fakeMirrorManager
orphanedPodNames := []string{"pod1_ns", "pod2_ns"}
mirrorPods := util.NewStringSet()
for _, name := range orphanedPodNames {
mirrorPods.Insert(name)
orphanPods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: "pod1",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "api",
ConfigMirrorAnnotationKey: "mirror",
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "12345679",
Name: "pod2",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "api",
ConfigMirrorAnnotationKey: "mirror",
},
},
},
}
mirrorPods := newMirrorPods()
for _, pod := range orphanPods {
mirrorPods.Insert(&pod)
}
// Sync with an empty pod list to delete all mirror pods.
err := kl.SyncPods([]api.Pod{}, emptyPodUIDs, mirrorPods, time.Now())
err := kl.SyncPods([]api.Pod{}, emptyPodUIDs, *mirrorPods, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if manager.NumOfPods() != 0 {
t.Errorf("expected zero mirror pods, got %v", manager.GetPods())
}
for _, name := range orphanedPodNames {
for _, pod := range orphanPods {
name := GetPodFullName(&pod)
creates, deletes := manager.GetCounts(name)
if creates != 0 || deletes != 1 {
t.Errorf("expected 0 creation and one deletion of %q, got %d, %d", name, creates, deletes)
}
}
}
func TestGetContainerInfoForMirrorPods(t *testing.T) {
// pods contain one static and one mirror pod with the same name but
// different UIDs.
pods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "1234",
Name: "qux",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "file",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "foo"},
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "5678",
Name: "qux",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "api",
ConfigMirrorAnnotationKey: "mirror",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "foo"},
},
},
},
}
containerID := "ab2cdf"
containerPath := fmt.Sprintf("/docker/%v", containerID)
containerInfo := cadvisorApi.ContainerInfo{
ContainerReference: cadvisorApi.ContainerReference{
Name: containerPath,
},
}
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
mockCadvisor := testKubelet.fakeCadvisor
cadvisorReq := &cadvisorApi.ContainerInfoRequest{}
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil)
fakeDocker.ContainerList = []docker.APIContainers{
{
ID: containerID,
Names: []string{"/k8s_foo_qux_ns_1234_42"},
},
}
kubelet.pods, kubelet.mirrorPods = filterAndCategorizePods(pods)
// Use the mirror pod UID to retrieve the stats.
stats, err := kubelet.GetContainerInfo("qux_ns", "5678", "foo", cadvisorReq)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if stats == nil {
t.Fatalf("stats should not be nil")
}
mockCadvisor.AssertExpectations(t)
}

View File

@ -21,7 +21,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/golang/glog"
)
@ -86,15 +86,10 @@ func (self *basicMirrorManager) DeleteMirrorPod(podFullName string) error {
}
// Delete all orphaned mirror pods.
func deleteOrphanedMirrorPods(pods []api.Pod, mirrorPods util.StringSet, manager mirrorManager) {
existingPods := util.NewStringSet()
for _, pod := range pods {
existingPods.Insert(GetPodFullName(&pod))
}
for podFullName := range mirrorPods {
if !existingPods.Has(podFullName) {
manager.DeleteMirrorPod(podFullName)
}
func deleteOrphanedMirrorPods(mirrorPods mirrorPods, manager mirrorManager) {
podFullNames := mirrorPods.GetOrphanedMirrorPodNames()
for _, podFullName := range podFullNames {
manager.DeleteMirrorPod(podFullName)
}
}
@ -123,16 +118,90 @@ func isMirrorPod(pod *api.Pod) bool {
// This function separate the mirror pods from regular pods to
// facilitate pods syncing and mirror pod creation/deletion.
func filterAndCategorizePods(pods []api.Pod) ([]api.Pod, util.StringSet) {
func filterAndCategorizePods(pods []api.Pod) ([]api.Pod, mirrorPods) {
filteredPods := []api.Pod{}
mirrorPods := util.NewStringSet()
mirrorPods := newMirrorPods()
for _, pod := range pods {
name := GetPodFullName(&pod)
if isMirrorPod(&pod) {
mirrorPods.Insert(name)
} else {
mirrorPods.Insert(&pod)
if !isMirrorPod(&pod) {
filteredPods = append(filteredPods, pod)
}
}
return filteredPods, mirrorPods
return filteredPods, *mirrorPods
}
// mirrorPods is thread-compatible.
// TODO (yujuhong): Replace this with a pod manager that manages both regular
// pods and mirror pods.
type mirrorPods struct {
// Static pod UIDs indexed by pod full name.
static map[string]types.UID
// Mirror pod UIDs indexed by pod full name.
mirror map[string]types.UID
// Bi-directional UID mappings.
staticToMirror map[types.UID]types.UID
mirrorToStatic map[types.UID]types.UID
}
func newMirrorPods() *mirrorPods {
mirrorPods := mirrorPods{}
mirrorPods.static = make(map[string]types.UID)
mirrorPods.mirror = make(map[string]types.UID)
mirrorPods.staticToMirror = make(map[types.UID]types.UID)
mirrorPods.mirrorToStatic = make(map[types.UID]types.UID)
return &mirrorPods
}
func (self *mirrorPods) Insert(pod *api.Pod) {
podFullName := GetPodFullName(pod)
if isMirrorPod(pod) {
self.mirror[podFullName] = pod.UID
} else if isStaticPod(pod) {
self.static[podFullName] = pod.UID
}
staticUID, found1 := self.static[podFullName]
mirrorUID, found2 := self.mirror[podFullName]
// Update the UID mappings.
if found1 && found2 {
self.staticToMirror[staticUID] = mirrorUID
self.mirrorToStatic[mirrorUID] = staticUID
}
}
func (self *mirrorPods) HasStaticPod(key types.UID) bool {
_, ok := self.mirrorToStatic[key]
return ok
}
func (self *mirrorPods) HasMirrorPod(key types.UID) bool {
_, ok := self.staticToMirror[key]
return ok
}
func (self *mirrorPods) GetMirrorUID(key types.UID) (types.UID, bool) {
value, ok := self.staticToMirror[key]
if !ok {
return "", false
}
return value, true
}
func (self *mirrorPods) GetStaticUID(key types.UID) (types.UID, bool) {
value, ok := self.mirrorToStatic[key]
if !ok {
return "", false
}
return value, true
}
func (self *mirrorPods) GetOrphanedMirrorPodNames() []string {
orphanedPodNames := []string{}
for podFullName := range self.mirror {
if _, ok := self.static[podFullName]; !ok {
orphanedPodNames = append(orphanedPodNames, podFullName)
}
}
return orphanedPodNames
}

View File

@ -121,7 +121,7 @@ func TestFilterOutMirrorPods(t *testing.T) {
if !reflect.DeepEqual(expectedPods, actualPods) {
t.Errorf("expected %#v, got %#v", expectedPods, actualPods)
}
if !actualMirrorPods.Has(GetPodFullName(&mirrorPod)) {
if _, ok := actualMirrorPods.mirror[GetPodFullName(&mirrorPod)]; !ok {
t.Errorf("mirror pod is not recorded")
}
}

View File

@ -37,7 +37,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy"
"github.com/golang/glog"
@ -83,7 +82,7 @@ type HostInterface interface {
GetRootInfo(req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
GetDockerVersion() ([]uint, error)
GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error)
GetPods() ([]api.Pod, util.StringSet)
GetPods() ([]api.Pod, mirrorPods)
GetPodByName(namespace, name string) (*api.Pod, bool)
GetPodStatus(name string, uid types.UID) (api.PodStatus, error)
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)

View File

@ -33,7 +33,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy"
cadvisorApi "github.com/google/cadvisor/info/v1"
@ -45,7 +44,7 @@ type fakeKubelet struct {
containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
rootInfoFunc func(query *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
machineInfoFunc func() (*cadvisorApi.MachineInfo, error)
podsFunc func() ([]api.Pod, util.StringSet)
podsFunc func() ([]api.Pod, mirrorPods)
logFunc func(w http.ResponseWriter, req *http.Request)
runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
dockerVersionFunc func() ([]uint, error)
@ -80,7 +79,7 @@ func (fk *fakeKubelet) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error)
return fk.machineInfoFunc()
}
func (fk *fakeKubelet) GetPods() ([]api.Pod, util.StringSet) {
func (fk *fakeKubelet) GetPods() ([]api.Pod, mirrorPods) {
return fk.podsFunc()
}