Merge pull request #7037 from xiang90/no-self

pkg/kubelet: no more self receiver in kubelet pkg
This commit is contained in:
Victor Marmol 2015-04-20 08:03:48 -07:00
commit 446f349910
11 changed files with 245 additions and 244 deletions

View File

@ -77,9 +77,9 @@ func New(port uint) (Interface, error) {
return cadvisorClient, nil return cadvisorClient, nil
} }
func (self *cadvisorClient) exportHTTP(port uint) error { func (cc *cadvisorClient) exportHTTP(port uint) error {
mux := http.NewServeMux() mux := http.NewServeMux()
err := cadvisorHttp.RegisterHandlers(mux, self, "", "", "", "", "/metrics") err := cadvisorHttp.RegisterHandlers(mux, cc, "", "", "", "", "/metrics")
if err != nil { if err != nil {
return err return err
} }
@ -106,20 +106,20 @@ func (self *cadvisorClient) exportHTTP(port uint) error {
return nil return nil
} }
func (self *cadvisorClient) ContainerInfo(name string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) { func (cc *cadvisorClient) ContainerInfo(name string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
return self.GetContainerInfo(name, req) return cc.GetContainerInfo(name, req)
} }
func (self *cadvisorClient) VersionInfo() (*cadvisorApi.VersionInfo, error) { func (cc *cadvisorClient) VersionInfo() (*cadvisorApi.VersionInfo, error) {
return self.GetVersionInfo() return cc.GetVersionInfo()
} }
func (self *cadvisorClient) MachineInfo() (*cadvisorApi.MachineInfo, error) { func (cc *cadvisorClient) MachineInfo() (*cadvisorApi.MachineInfo, error) {
return self.GetMachineInfo() return cc.GetMachineInfo()
} }
func (self *cadvisorClient) DockerImagesFsInfo() (cadvisorApiV2.FsInfo, error) { func (cc *cadvisorClient) DockerImagesFsInfo() (cadvisorApiV2.FsInfo, error) {
res, err := self.GetFsInfo(cadvisorFs.LabelDockerImages) res, err := cc.GetFsInfo(cadvisorFs.LabelDockerImages)
if err != nil { if err != nil {
return cadvisorApiV2.FsInfo{}, err return cadvisorApiV2.FsInfo{}, err
} }
@ -134,6 +134,6 @@ func (self *cadvisorClient) DockerImagesFsInfo() (cadvisorApiV2.FsInfo, error) {
return res[0], nil return res[0], nil
} }
func (self *cadvisorClient) GetPastEvents(request *events.Request) ([]*cadvisorApi.Event, error) { func (cc *cadvisorClient) GetPastEvents(request *events.Request) ([]*cadvisorApi.Event, error) {
return self.GetPastEvents(request) return cc.GetPastEvents(request)
} }

View File

@ -37,26 +37,26 @@ func New(port uint) (Interface, error) {
var unsupportedErr = errors.New("cAdvisor is unsupported in this build") var unsupportedErr = errors.New("cAdvisor is unsupported in this build")
func (self *cadvisorUnsupported) DockerContainer(name string, req *cadvisorApi.ContainerInfoRequest) (cadvisorApi.ContainerInfo, error) { func (cu *cadvisorUnsupported) DockerContainer(name string, req *cadvisorApi.ContainerInfoRequest) (cadvisorApi.ContainerInfo, error) {
return cadvisorApi.ContainerInfo{}, unsupportedErr return cadvisorApi.ContainerInfo{}, unsupportedErr
} }
func (self *cadvisorUnsupported) ContainerInfo(name string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) { func (cu *cadvisorUnsupported) ContainerInfo(name string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
return nil, unsupportedErr return nil, unsupportedErr
} }
func (self *cadvisorUnsupported) MachineInfo() (*cadvisorApi.MachineInfo, error) { func (cu *cadvisorUnsupported) MachineInfo() (*cadvisorApi.MachineInfo, error) {
return nil, unsupportedErr return nil, unsupportedErr
} }
func (self *cadvisorUnsupported) VersionInfo() (*cadvisorApi.VersionInfo, error) { func (cu *cadvisorUnsupported) VersionInfo() (*cadvisorApi.VersionInfo, error) {
return nil, unsupportedErr return nil, unsupportedErr
} }
func (self *cadvisorUnsupported) DockerImagesFsInfo() (cadvisorApiV2.FsInfo, error) { func (cu *cadvisorUnsupported) DockerImagesFsInfo() (cadvisorApiV2.FsInfo, error) {
return cadvisorApiV2.FsInfo{}, unsupportedErr return cadvisorApiV2.FsInfo{}, unsupportedErr
} }
func (self *cadvisorUnsupported) GetPastEvents(request *events.Request) ([]*cadvisorApi.Event, error) { func (cu *cadvisorUnsupported) GetPastEvents(request *events.Request) ([]*cadvisorApi.Event, error) {
return []*cadvisorApi.Event{}, unsupportedErr return []*cadvisorApi.Event{}, unsupportedErr
} }

View File

@ -89,21 +89,22 @@ type evictUnit struct {
// Name of the container in the pod. // Name of the container in the pod.
name string name string
} }
type containersByEvictUnit map[evictUnit][]containerGCInfo type containersByEvictUnit map[evictUnit][]containerGCInfo
// Returns the number of containers in this map. // Returns the number of containers in this map.
func (self containersByEvictUnit) NumContainers() int { func (cu containersByEvictUnit) NumContainers() int {
num := 0 num := 0
for key := range self { for key := range cu {
num += len(self[key]) num += len(cu[key])
} }
return num return num
} }
// Returns the number of pod in this map. // Returns the number of pod in this map.
func (self containersByEvictUnit) NumEvictUnits() int { func (cu containersByEvictUnit) NumEvictUnits() int {
return len(self) return len(cu)
} }
// Newest first. // Newest first.
@ -113,9 +114,9 @@ func (a byCreated) Len() int { return len(a) }
func (a byCreated) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a byCreated) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byCreated) Less(i, j int) bool { return a[i].createTime.After(a[j].createTime) } func (a byCreated) Less(i, j int) bool { return a[i].createTime.After(a[j].createTime) }
func (self *realContainerGC) GarbageCollect() error { func (cgc *realContainerGC) GarbageCollect() error {
// Separate containers by evict units. // Separate containers by evict units.
evictUnits, unidentifiedContainers, err := self.evictableContainers() evictUnits, unidentifiedContainers, err := cgc.evictableContainers()
if err != nil { if err != nil {
return err return err
} }
@ -123,58 +124,58 @@ func (self *realContainerGC) GarbageCollect() error {
// Remove unidentified containers. // Remove unidentified containers.
for _, container := range unidentifiedContainers { for _, container := range unidentifiedContainers {
glog.Infof("Removing unidentified dead container %q with ID %q", container.name, container.id) glog.Infof("Removing unidentified dead container %q with ID %q", container.name, container.id)
err = self.dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: container.id}) err = cgc.dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: container.id})
if err != nil { if err != nil {
glog.Warningf("Failed to remove unidentified dead container %q: %v", container.name, err) glog.Warningf("Failed to remove unidentified dead container %q: %v", container.name, err)
} }
} }
// Enforce max containers per evict unit. // Enforce max containers per evict unit.
if self.policy.MaxPerPodContainer >= 0 { if cgc.policy.MaxPerPodContainer >= 0 {
self.enforceMaxContainersPerEvictUnit(evictUnits, self.policy.MaxPerPodContainer) cgc.enforceMaxContainersPerEvictUnit(evictUnits, cgc.policy.MaxPerPodContainer)
} }
// Enforce max total number of containers. // Enforce max total number of containers.
if self.policy.MaxContainers >= 0 && evictUnits.NumContainers() > self.policy.MaxContainers { if cgc.policy.MaxContainers >= 0 && evictUnits.NumContainers() > cgc.policy.MaxContainers {
// Leave an equal number of containers per evict unit (min: 1). // Leave an equal number of containers per evict unit (min: 1).
numContainersPerEvictUnit := self.policy.MaxContainers / evictUnits.NumEvictUnits() numContainersPerEvictUnit := cgc.policy.MaxContainers / evictUnits.NumEvictUnits()
if numContainersPerEvictUnit < 1 { if numContainersPerEvictUnit < 1 {
numContainersPerEvictUnit = 1 numContainersPerEvictUnit = 1
} }
self.enforceMaxContainersPerEvictUnit(evictUnits, numContainersPerEvictUnit) cgc.enforceMaxContainersPerEvictUnit(evictUnits, numContainersPerEvictUnit)
// If we still need to evict, evict oldest first. // If we still need to evict, evict oldest first.
numContainers := evictUnits.NumContainers() numContainers := evictUnits.NumContainers()
if numContainers > self.policy.MaxContainers { if numContainers > cgc.policy.MaxContainers {
flattened := make([]containerGCInfo, 0, numContainers) flattened := make([]containerGCInfo, 0, numContainers)
for uid := range evictUnits { for uid := range evictUnits {
flattened = append(flattened, evictUnits[uid]...) flattened = append(flattened, evictUnits[uid]...)
} }
sort.Sort(byCreated(flattened)) sort.Sort(byCreated(flattened))
self.removeOldestN(flattened, numContainers-self.policy.MaxContainers) cgc.removeOldestN(flattened, numContainers-cgc.policy.MaxContainers)
} }
} }
return nil return nil
} }
func (self *realContainerGC) enforceMaxContainersPerEvictUnit(evictUnits containersByEvictUnit, MaxContainers int) { func (cgc *realContainerGC) enforceMaxContainersPerEvictUnit(evictUnits containersByEvictUnit, MaxContainers int) {
for uid := range evictUnits { for uid := range evictUnits {
toRemove := len(evictUnits[uid]) - MaxContainers toRemove := len(evictUnits[uid]) - MaxContainers
if toRemove > 0 { if toRemove > 0 {
evictUnits[uid] = self.removeOldestN(evictUnits[uid], toRemove) evictUnits[uid] = cgc.removeOldestN(evictUnits[uid], toRemove)
} }
} }
} }
// Removes the oldest toRemove containers and returns the resulting slice. // Removes the oldest toRemove containers and returns the resulting slice.
func (self *realContainerGC) removeOldestN(containers []containerGCInfo, toRemove int) []containerGCInfo { func (cgc *realContainerGC) removeOldestN(containers []containerGCInfo, toRemove int) []containerGCInfo {
// Remove from oldest to newest (last to first). // Remove from oldest to newest (last to first).
numToKeep := len(containers) - toRemove numToKeep := len(containers) - toRemove
for i := numToKeep; i < len(containers); i++ { for i := numToKeep; i < len(containers); i++ {
err := self.dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: containers[i].id}) err := cgc.dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: containers[i].id})
if err != nil { if err != nil {
glog.Warningf("Failed to remove dead container %q: %v", containers[i].name, err) glog.Warningf("Failed to remove dead container %q: %v", containers[i].name, err)
} }
@ -186,18 +187,18 @@ func (self *realContainerGC) removeOldestN(containers []containerGCInfo, toRemov
// Get all containers that are evictable. Evictable containers are: not running // Get all containers that are evictable. Evictable containers are: not running
// and created more than MinAge ago. // and created more than MinAge ago.
func (self *realContainerGC) evictableContainers() (containersByEvictUnit, []containerGCInfo, error) { func (cgc *realContainerGC) evictableContainers() (containersByEvictUnit, []containerGCInfo, error) {
containers, err := dockertools.GetKubeletDockerContainers(self.dockerClient, true) containers, err := dockertools.GetKubeletDockerContainers(cgc.dockerClient, true)
if err != nil { if err != nil {
return containersByEvictUnit{}, []containerGCInfo{}, err return containersByEvictUnit{}, []containerGCInfo{}, err
} }
unidentifiedContainers := make([]containerGCInfo, 0) unidentifiedContainers := make([]containerGCInfo, 0)
evictUnits := make(containersByEvictUnit) evictUnits := make(containersByEvictUnit)
newestGCTime := time.Now().Add(-self.policy.MinAge) newestGCTime := time.Now().Add(-cgc.policy.MinAge)
for _, container := range containers { for _, container := range containers {
// Prune out running containers. // Prune out running containers.
data, err := self.dockerClient.InspectContainer(container.ID) data, err := cgc.dockerClient.InspectContainer(container.ID)
if err != nil { if err != nil {
// Container may have been removed already, skip. // Container may have been removed already, skip.
continue continue

View File

@ -87,26 +87,26 @@ type stringCache struct {
cache *lru.Cache cache *lru.Cache
} }
func (self *stringCache) composeKey(uid types.UID, name string) string { func (sc *stringCache) composeKey(uid types.UID, name string) string {
return fmt.Sprintf("%s_%s", uid, name) return fmt.Sprintf("%s_%s", uid, name)
} }
func (self *stringCache) Add(uid types.UID, name string, value string) { func (sc *stringCache) Add(uid types.UID, name string, value string) {
self.lock.Lock() sc.lock.Lock()
defer self.lock.Unlock() defer sc.lock.Unlock()
self.cache.Add(self.composeKey(uid, name), value) sc.cache.Add(sc.composeKey(uid, name), value)
} }
func (self *stringCache) Remove(uid types.UID, name string) { func (sc *stringCache) Remove(uid types.UID, name string) {
self.lock.Lock() sc.lock.Lock()
defer self.lock.Unlock() defer sc.lock.Unlock()
self.cache.Remove(self.composeKey(uid, name)) sc.cache.Remove(sc.composeKey(uid, name))
} }
func (self *stringCache) Get(uid types.UID, name string) (string, bool) { func (sc *stringCache) Get(uid types.UID, name string) (string, bool) {
self.lock.RLock() sc.lock.RLock()
defer self.lock.RUnlock() defer sc.lock.RUnlock()
value, ok := self.cache.Get(self.composeKey(uid, name)) value, ok := sc.cache.Get(sc.composeKey(uid, name))
if ok { if ok {
return value.(string), ok return value.(string), ok
} else { } else {
@ -119,7 +119,7 @@ func (self *stringCache) Get(uid types.UID, name string) (string, bool) {
// stream the log. Set |follow| to false and specify the number of lines (e.g. // stream the log. Set |follow| to false and specify the number of lines (e.g.
// "100" or "all") to tail the log. // "100" or "all") to tail the log.
// TODO: Make 'RawTerminal' option flagable. // TODO: Make 'RawTerminal' option flagable.
func (self *DockerManager) GetKubeletDockerContainerLogs(containerID, tail string, follow bool, stdout, stderr io.Writer) (err error) { func (dm *DockerManager) GetKubeletDockerContainerLogs(containerID, tail string, follow bool, stdout, stderr io.Writer) (err error) {
opts := docker.LogsOptions{ opts := docker.LogsOptions{
Container: containerID, Container: containerID,
Stdout: true, Stdout: true,
@ -135,7 +135,7 @@ func (self *DockerManager) GetKubeletDockerContainerLogs(containerID, tail strin
opts.Tail = tail opts.Tail = tail
} }
err = self.client.Logs(opts) err = dm.client.Logs(opts)
return return
} }
@ -157,10 +157,10 @@ type containerStatusResult struct {
err error err error
} }
func (self *DockerManager) inspectContainer(dockerID, containerName, tPath string) *containerStatusResult { func (dm *DockerManager) inspectContainer(dockerID, containerName, tPath string) *containerStatusResult {
result := containerStatusResult{api.ContainerStatus{}, "", nil} result := containerStatusResult{api.ContainerStatus{}, "", nil}
inspectResult, err := self.client.InspectContainer(dockerID) inspectResult, err := dm.client.InspectContainer(dockerID)
if err != nil { if err != nil {
result.err = err result.err = err
@ -226,7 +226,7 @@ func (self *DockerManager) inspectContainer(dockerID, containerName, tPath strin
// GetPodStatus returns docker related status for all containers in the pod as // GetPodStatus returns docker related status for all containers in the pod as
// well as the infrastructure container. // well as the infrastructure container.
func (self *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) { func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID uid := pod.UID
manifest := pod.Spec manifest := pod.Spec
@ -249,7 +249,7 @@ func (self *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
} }
expectedContainers[PodInfraContainerName] = api.Container{} expectedContainers[PodInfraContainerName] = api.Container{}
containers, err := self.client.ListContainers(docker.ListContainersOptions{All: true}) containers, err := dm.client.ListContainers(docker.ListContainersOptions{All: true})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -284,7 +284,7 @@ func (self *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
var terminationState *api.ContainerState = nil var terminationState *api.ContainerState = nil
// Inspect the container. // Inspect the container.
result := self.inspectContainer(value.ID, dockerContainerName, terminationMessagePath) result := dm.inspectContainer(value.ID, dockerContainerName, terminationMessagePath)
if result.err != nil { if result.err != nil {
return nil, result.err return nil, result.err
} else if result.status.State.Termination != nil { } else if result.status.State.Termination != nil {
@ -347,7 +347,7 @@ func (self *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
// record the pull failure and eliminate the image checking below. // record the pull failure and eliminate the image checking below.
image := container.Image image := container.Image
// TODO(dchen1107): docker/docker/issues/8365 to figure out if the image exists // TODO(dchen1107): docker/docker/issues/8365 to figure out if the image exists
_, err := self.client.InspectImage(image) _, err := dm.client.InspectImage(image)
if err == nil { if err == nil {
containerStatus.State.Waiting = &api.ContainerStateWaiting{ containerStatus.State.Waiting = &api.ContainerStateWaiting{
Reason: fmt.Sprintf("Image: %s is ready, container is creating", image), Reason: fmt.Sprintf("Image: %s is ready, container is creating", image),
@ -364,7 +364,7 @@ func (self *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
for containerName, status := range statuses { for containerName, status := range statuses {
if status.State.Waiting != nil { if status.State.Waiting != nil {
// For containers in the waiting state, fill in a specific reason if it is recorded. // For containers in the waiting state, fill in a specific reason if it is recorded.
if reason, ok := self.reasonCache.Get(uid, containerName); ok { if reason, ok := dm.reasonCache.Get(uid, containerName); ok {
status.State.Waiting.Reason = reason status.State.Waiting.Reason = reason
} }
} }
@ -374,13 +374,13 @@ func (self *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
return &podStatus, nil return &podStatus, nil
} }
func (self *DockerManager) GetRunningContainers(ids []string) ([]*docker.Container, error) { func (dm *DockerManager) GetRunningContainers(ids []string) ([]*docker.Container, error) {
result := []*docker.Container{} result := []*docker.Container{}
if self.client == nil { if dm.client == nil {
return nil, fmt.Errorf("unexpected nil docker client.") return nil, fmt.Errorf("unexpected nil docker client.")
} }
for ix := range ids { for ix := range ids {
status, err := self.client.InspectContainer(ids[ix]) status, err := dm.client.InspectContainer(ids[ix])
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -391,20 +391,20 @@ func (self *DockerManager) GetRunningContainers(ids []string) ([]*docker.Contain
return result, nil return result, nil
} }
func (self *DockerManager) RunContainer(pod *api.Pod, container *api.Container, opts *kubecontainer.RunContainerOptions) (string, error) { func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, opts *kubecontainer.RunContainerOptions) (string, error) {
dockerID, err := self.runContainer(pod, container, opts) dockerID, err := dm.runContainer(pod, container, opts)
if err != nil { if err != nil {
errString := err.Error() errString := err.Error()
if errString != "" { if errString != "" {
self.reasonCache.Add(pod.UID, container.Name, errString) dm.reasonCache.Add(pod.UID, container.Name, errString)
} else { } else {
self.reasonCache.Remove(pod.UID, container.Name) dm.reasonCache.Remove(pod.UID, container.Name)
} }
} }
return dockerID, err return dockerID, err
} }
func (self *DockerManager) runContainer(pod *api.Pod, container *api.Container, opts *kubecontainer.RunContainerOptions) (string, error) { func (dm *DockerManager) runContainer(pod *api.Pod, container *api.Container, opts *kubecontainer.RunContainerOptions) (string, error) {
ref, err := kubecontainer.GenerateContainerRef(pod, container) ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil { if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
@ -441,16 +441,16 @@ func (self *DockerManager) runContainer(pod *api.Pod, container *api.Container,
glog.V(3).Infof("Container %v/%v/%v: setting entrypoint \"%v\" and command \"%v\"", pod.Namespace, pod.Name, container.Name, dockerOpts.Config.Entrypoint, dockerOpts.Config.Cmd) glog.V(3).Infof("Container %v/%v/%v: setting entrypoint \"%v\" and command \"%v\"", pod.Namespace, pod.Name, container.Name, dockerOpts.Config.Entrypoint, dockerOpts.Config.Cmd)
dockerContainer, err := self.client.CreateContainer(dockerOpts) dockerContainer, err := dm.client.CreateContainer(dockerOpts)
if err != nil { if err != nil {
if ref != nil { if ref != nil {
self.recorder.Eventf(ref, "failed", "Failed to create docker container with error: %v", err) dm.recorder.Eventf(ref, "failed", "Failed to create docker container with error: %v", err)
} }
return "", err return "", err
} }
if ref != nil { if ref != nil {
self.recorder.Eventf(ref, "created", "Created with docker id %v", dockerContainer.ID) dm.recorder.Eventf(ref, "created", "Created with docker id %v", dockerContainer.ID)
} }
// The reason we create and mount the log file in here (not in kubelet) is because // The reason we create and mount the log file in here (not in kubelet) is because
@ -495,15 +495,15 @@ func (self *DockerManager) runContainer(pod *api.Pod, container *api.Container,
hc.DNSSearch = opts.DNSSearch hc.DNSSearch = opts.DNSSearch
} }
if err = self.client.StartContainer(dockerContainer.ID, hc); err != nil { if err = dm.client.StartContainer(dockerContainer.ID, hc); err != nil {
if ref != nil { if ref != nil {
self.recorder.Eventf(ref, "failed", dm.recorder.Eventf(ref, "failed",
"Failed to start with docker id %v with error: %v", dockerContainer.ID, err) "Failed to start with docker id %v with error: %v", dockerContainer.ID, err)
} }
return "", err return "", err
} }
if ref != nil { if ref != nil {
self.recorder.Eventf(ref, "started", "Started with docker id %v", dockerContainer.ID) dm.recorder.Eventf(ref, "started", "Started with docker id %v", dockerContainer.ID)
} }
return dockerContainer.ID, nil return dockerContainer.ID, nil
} }
@ -565,11 +565,11 @@ func makeCapabilites(capAdd []api.CapabilityType, capDrop []api.CapabilityType)
return addCaps, dropCaps return addCaps, dropCaps
} }
func (self *DockerManager) GetPods(all bool) ([]*kubecontainer.Pod, error) { func (dm *DockerManager) GetPods(all bool) ([]*kubecontainer.Pod, error) {
pods := make(map[types.UID]*kubecontainer.Pod) pods := make(map[types.UID]*kubecontainer.Pod)
var result []*kubecontainer.Pod var result []*kubecontainer.Pod
containers, err := GetKubeletDockerContainers(self.client, all) containers, err := GetKubeletDockerContainers(dm.client, all)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -614,20 +614,20 @@ func (self *DockerManager) GetPods(all bool) ([]*kubecontainer.Pod, error) {
return result, nil return result, nil
} }
func (self *DockerManager) Pull(image string) error { func (dm *DockerManager) Pull(image string) error {
return self.Puller.Pull(image) return dm.Puller.Pull(image)
} }
func (self *DockerManager) IsImagePresent(image string) (bool, error) { func (dm *DockerManager) IsImagePresent(image string) (bool, error) {
return self.Puller.IsImagePresent(image) return dm.Puller.IsImagePresent(image)
} }
// PodInfraContainer returns true if the pod infra container has changed. // PodInfraContainer returns true if the pod infra container has changed.
func (self *DockerManager) PodInfraContainerChanged(pod *api.Pod, podInfraContainer *kubecontainer.Container) (bool, error) { func (dm *DockerManager) PodInfraContainerChanged(pod *api.Pod, podInfraContainer *kubecontainer.Container) (bool, error) {
networkMode := "" networkMode := ""
var ports []api.ContainerPort var ports []api.ContainerPort
dockerPodInfraContainer, err := self.client.InspectContainer(string(podInfraContainer.ID)) dockerPodInfraContainer, err := dm.client.InspectContainer(string(podInfraContainer.ID))
if err != nil { if err != nil {
return false, err return false, err
} }
@ -650,7 +650,7 @@ func (self *DockerManager) PodInfraContainerChanged(pod *api.Pod, podInfraContai
} }
expectedPodInfraContainer := &api.Container{ expectedPodInfraContainer := &api.Container{
Name: PodInfraContainerName, Name: PodInfraContainerName,
Image: self.PodInfraContainerImage, Image: dm.PodInfraContainerImage,
Ports: ports, Ports: ports,
} }
return podInfraContainer.Hash != HashContainer(expectedPodInfraContainer), nil return podInfraContainer.Hash != HashContainer(expectedPodInfraContainer), nil

View File

@ -112,16 +112,16 @@ func newImageManager(dockerClient dockertools.DockerInterface, cadvisorInterface
return im, nil return im, nil
} }
func (self *realImageManager) start() error { func (im *realImageManager) start() error {
// Initial detection make detected time "unknown" in the past. // Initial detection make detected time "unknown" in the past.
var zero time.Time var zero time.Time
err := self.detectImages(zero) err := im.detectImages(zero)
if err != nil { if err != nil {
return err return err
} }
go util.Forever(func() { go util.Forever(func() {
err := self.detectImages(time.Now()) err := im.detectImages(time.Now())
if err != nil { if err != nil {
glog.Warningf("[ImageManager] Failed to monitor images: %v", err) glog.Warningf("[ImageManager] Failed to monitor images: %v", err)
} }
@ -130,12 +130,12 @@ func (self *realImageManager) start() error {
return nil return nil
} }
func (self *realImageManager) detectImages(detected time.Time) error { func (im *realImageManager) detectImages(detected time.Time) error {
images, err := self.dockerClient.ListImages(docker.ListImagesOptions{}) images, err := im.dockerClient.ListImages(docker.ListImagesOptions{})
if err != nil { if err != nil {
return err return err
} }
containers, err := self.dockerClient.ListContainers(docker.ListContainersOptions{ containers, err := im.dockerClient.ListContainers(docker.ListContainersOptions{
All: true, All: true,
}) })
if err != nil { if err != nil {
@ -151,39 +151,39 @@ func (self *realImageManager) detectImages(detected time.Time) error {
// Add new images and record those being used. // Add new images and record those being used.
now := time.Now() now := time.Now()
currentImages := util.NewStringSet() currentImages := util.NewStringSet()
self.imageRecordsLock.Lock() im.imageRecordsLock.Lock()
defer self.imageRecordsLock.Unlock() defer im.imageRecordsLock.Unlock()
for _, image := range images { for _, image := range images {
currentImages.Insert(image.ID) currentImages.Insert(image.ID)
// New image, set it as detected now. // New image, set it as detected now.
if _, ok := self.imageRecords[image.ID]; !ok { if _, ok := im.imageRecords[image.ID]; !ok {
self.imageRecords[image.ID] = &imageRecord{ im.imageRecords[image.ID] = &imageRecord{
detected: detected, detected: detected,
} }
} }
// Set last used time to now if the image is being used. // Set last used time to now if the image is being used.
if isImageUsed(&image, imagesInUse) { if isImageUsed(&image, imagesInUse) {
self.imageRecords[image.ID].lastUsed = now im.imageRecords[image.ID].lastUsed = now
} }
self.imageRecords[image.ID].size = image.VirtualSize im.imageRecords[image.ID].size = image.VirtualSize
} }
// Remove old images from our records. // Remove old images from our records.
for image := range self.imageRecords { for image := range im.imageRecords {
if !currentImages.Has(image) { if !currentImages.Has(image) {
delete(self.imageRecords, image) delete(im.imageRecords, image)
} }
} }
return nil return nil
} }
func (self *realImageManager) GarbageCollect() error { func (im *realImageManager) GarbageCollect() error {
// Get disk usage on disk holding images. // Get disk usage on disk holding images.
fsInfo, err := self.cadvisor.DockerImagesFsInfo() fsInfo, err := im.cadvisor.DockerImagesFsInfo()
if err != nil { if err != nil {
return err return err
} }
@ -193,23 +193,23 @@ func (self *realImageManager) GarbageCollect() error {
// Check valid capacity. // Check valid capacity.
if capacity == 0 { if capacity == 0 {
err := fmt.Errorf("invalid capacity %d on device %q at mount point %q", capacity, fsInfo.Device, fsInfo.Mountpoint) err := fmt.Errorf("invalid capacity %d on device %q at mount point %q", capacity, fsInfo.Device, fsInfo.Mountpoint)
self.recorder.Eventf(self.nodeRef, "invalidDiskCapacity", err.Error()) im.recorder.Eventf(im.nodeRef, "invalidDiskCapacity", err.Error())
return err return err
} }
// If over the max threshold, free enough to place us at the lower threshold. // If over the max threshold, free enough to place us at the lower threshold.
usagePercent := int(usage * 100 / capacity) usagePercent := int(usage * 100 / capacity)
if usagePercent >= self.policy.HighThresholdPercent { if usagePercent >= im.policy.HighThresholdPercent {
amountToFree := usage - (int64(self.policy.LowThresholdPercent) * capacity / 100) amountToFree := usage - (int64(im.policy.LowThresholdPercent) * capacity / 100)
glog.Infof("[ImageManager]: Disk usage on %q (%s) is at %d%% which is over the high threshold (%d%%). Trying to free %d bytes", fsInfo.Device, fsInfo.Mountpoint, usagePercent, self.policy.HighThresholdPercent, amountToFree) glog.Infof("[ImageManager]: Disk usage on %q (%s) is at %d%% which is over the high threshold (%d%%). Trying to free %d bytes", fsInfo.Device, fsInfo.Mountpoint, usagePercent, im.policy.HighThresholdPercent, amountToFree)
freed, err := self.freeSpace(amountToFree) freed, err := im.freeSpace(amountToFree)
if err != nil { if err != nil {
return err return err
} }
if freed < amountToFree { if freed < amountToFree {
err := fmt.Errorf("failed to garbage collect required amount of images. Wanted to free %d, but freed %d", amountToFree, freed) err := fmt.Errorf("failed to garbage collect required amount of images. Wanted to free %d, but freed %d", amountToFree, freed)
self.recorder.Eventf(self.nodeRef, "freeDiskSpaceFailed", err.Error()) im.recorder.Eventf(im.nodeRef, "freeDiskSpaceFailed", err.Error())
return err return err
} }
} }
@ -223,19 +223,19 @@ func (self *realImageManager) GarbageCollect() error {
// bytes freed is always returned. // bytes freed is always returned.
// Note that error may be nil and the number of bytes free may be less // Note that error may be nil and the number of bytes free may be less
// than bytesToFree. // than bytesToFree.
func (self *realImageManager) freeSpace(bytesToFree int64) (int64, error) { func (im *realImageManager) freeSpace(bytesToFree int64) (int64, error) {
startTime := time.Now() startTime := time.Now()
err := self.detectImages(startTime) err := im.detectImages(startTime)
if err != nil { if err != nil {
return 0, err return 0, err
} }
self.imageRecordsLock.Lock() im.imageRecordsLock.Lock()
defer self.imageRecordsLock.Unlock() defer im.imageRecordsLock.Unlock()
// Get all images in eviction order. // Get all images in eviction order.
images := make([]evictionInfo, 0, len(self.imageRecords)) images := make([]evictionInfo, 0, len(im.imageRecords))
for image, record := range self.imageRecords { for image, record := range im.imageRecords {
images = append(images, evictionInfo{ images = append(images, evictionInfo{
id: image, id: image,
imageRecord: *record, imageRecord: *record,
@ -254,12 +254,12 @@ func (self *realImageManager) freeSpace(bytesToFree int64) (int64, error) {
// Remove image. Continue despite errors. // Remove image. Continue despite errors.
glog.Infof("[ImageManager]: Removing image %q to free %d bytes", image.id, image.size) glog.Infof("[ImageManager]: Removing image %q to free %d bytes", image.id, image.size)
err := self.dockerClient.RemoveImage(image.id) err := im.dockerClient.RemoveImage(image.id)
if err != nil { if err != nil {
lastErr = err lastErr = err
continue continue
} }
delete(self.imageRecords, image.id) delete(im.imageRecords, image.id)
spaceFreed += image.size spaceFreed += image.size
if spaceFreed >= bytesToFree { if spaceFreed >= bytesToFree {
@ -277,14 +277,14 @@ type evictionInfo struct {
type byLastUsedAndDetected []evictionInfo type byLastUsedAndDetected []evictionInfo
func (self byLastUsedAndDetected) Len() int { return len(self) } func (ev byLastUsedAndDetected) Len() int { return len(ev) }
func (self byLastUsedAndDetected) Swap(i, j int) { self[i], self[j] = self[j], self[i] } func (ev byLastUsedAndDetected) Swap(i, j int) { ev[i], ev[j] = ev[j], ev[i] }
func (self byLastUsedAndDetected) Less(i, j int) bool { func (ev byLastUsedAndDetected) Less(i, j int) bool {
// Sort by last used, break ties by detected. // Sort by last used, break ties by detected.
if self[i].lastUsed.Equal(self[j].lastUsed) { if ev[i].lastUsed.Equal(ev[j].lastUsed) {
return self[i].detected.Before(self[j].detected) return ev[i].detected.Before(ev[j].detected)
} else { } else {
return self[i].lastUsed.Before(self[j].lastUsed) return ev[i].lastUsed.Before(ev[j].lastUsed)
} }
} }

View File

@ -48,15 +48,15 @@ func newRealImageManager(policy ImageGCPolicy) (*realImageManager, *dockertools.
} }
// Accessors used for thread-safe testing. // Accessors used for thread-safe testing.
func (self *realImageManager) imageRecordsLen() int { func (im *realImageManager) imageRecordsLen() int {
self.imageRecordsLock.Lock() im.imageRecordsLock.Lock()
defer self.imageRecordsLock.Unlock() defer im.imageRecordsLock.Unlock()
return len(self.imageRecords) return len(im.imageRecords)
} }
func (self *realImageManager) getImageRecord(name string) (*imageRecord, bool) { func (im *realImageManager) getImageRecord(name string) (*imageRecord, bool) {
self.imageRecordsLock.Lock() im.imageRecordsLock.Lock()
defer self.imageRecordsLock.Unlock() defer im.imageRecordsLock.Unlock()
v, ok := self.imageRecords[name] v, ok := im.imageRecords[name]
vCopy := *v vCopy := *v
return &vCopy, ok return &vCopy, ok
} }

View File

@ -36,114 +36,114 @@ func NewInstrumentedDockerInterface(dockerClient dockertools.DockerInterface) do
} }
} }
func (self instrumentedDockerInterface) ListContainers(options docker.ListContainersOptions) ([]docker.APIContainers, error) { func (in instrumentedDockerInterface) ListContainers(options docker.ListContainersOptions) ([]docker.APIContainers, error) {
start := time.Now() start := time.Now()
defer func() { defer func() {
DockerOperationsLatency.WithLabelValues("list_containers").Observe(SinceInMicroseconds(start)) DockerOperationsLatency.WithLabelValues("list_containers").Observe(SinceInMicroseconds(start))
}() }()
return self.client.ListContainers(options) return in.client.ListContainers(options)
} }
func (self instrumentedDockerInterface) InspectContainer(id string) (*docker.Container, error) { func (in instrumentedDockerInterface) InspectContainer(id string) (*docker.Container, error) {
start := time.Now() start := time.Now()
defer func() { defer func() {
DockerOperationsLatency.WithLabelValues("inspect_container").Observe(SinceInMicroseconds(start)) DockerOperationsLatency.WithLabelValues("inspect_container").Observe(SinceInMicroseconds(start))
}() }()
return self.client.InspectContainer(id) return in.client.InspectContainer(id)
} }
func (self instrumentedDockerInterface) CreateContainer(opts docker.CreateContainerOptions) (*docker.Container, error) { func (in instrumentedDockerInterface) CreateContainer(opts docker.CreateContainerOptions) (*docker.Container, error) {
start := time.Now() start := time.Now()
defer func() { defer func() {
DockerOperationsLatency.WithLabelValues("create_container").Observe(SinceInMicroseconds(start)) DockerOperationsLatency.WithLabelValues("create_container").Observe(SinceInMicroseconds(start))
}() }()
return self.client.CreateContainer(opts) return in.client.CreateContainer(opts)
} }
func (self instrumentedDockerInterface) StartContainer(id string, hostConfig *docker.HostConfig) error { func (in instrumentedDockerInterface) StartContainer(id string, hostConfig *docker.HostConfig) error {
start := time.Now() start := time.Now()
defer func() { defer func() {
DockerOperationsLatency.WithLabelValues("start_container").Observe(SinceInMicroseconds(start)) DockerOperationsLatency.WithLabelValues("start_container").Observe(SinceInMicroseconds(start))
}() }()
return self.client.StartContainer(id, hostConfig) return in.client.StartContainer(id, hostConfig)
} }
func (self instrumentedDockerInterface) StopContainer(id string, timeout uint) error { func (in instrumentedDockerInterface) StopContainer(id string, timeout uint) error {
start := time.Now() start := time.Now()
defer func() { defer func() {
DockerOperationsLatency.WithLabelValues("stop_container").Observe(SinceInMicroseconds(start)) DockerOperationsLatency.WithLabelValues("stop_container").Observe(SinceInMicroseconds(start))
}() }()
return self.client.StopContainer(id, timeout) return in.client.StopContainer(id, timeout)
} }
func (self instrumentedDockerInterface) RemoveContainer(opts docker.RemoveContainerOptions) error { func (in instrumentedDockerInterface) RemoveContainer(opts docker.RemoveContainerOptions) error {
start := time.Now() start := time.Now()
defer func() { defer func() {
DockerOperationsLatency.WithLabelValues("remove_container").Observe(SinceInMicroseconds(start)) DockerOperationsLatency.WithLabelValues("remove_container").Observe(SinceInMicroseconds(start))
}() }()
return self.client.RemoveContainer(opts) return in.client.RemoveContainer(opts)
} }
func (self instrumentedDockerInterface) InspectImage(image string) (*docker.Image, error) { func (in instrumentedDockerInterface) InspectImage(image string) (*docker.Image, error) {
start := time.Now() start := time.Now()
defer func() { defer func() {
DockerOperationsLatency.WithLabelValues("inspect_image").Observe(SinceInMicroseconds(start)) DockerOperationsLatency.WithLabelValues("inspect_image").Observe(SinceInMicroseconds(start))
}() }()
return self.client.InspectImage(image) return in.client.InspectImage(image)
} }
func (self instrumentedDockerInterface) ListImages(opts docker.ListImagesOptions) ([]docker.APIImages, error) { func (in instrumentedDockerInterface) ListImages(opts docker.ListImagesOptions) ([]docker.APIImages, error) {
start := time.Now() start := time.Now()
defer func() { defer func() {
DockerOperationsLatency.WithLabelValues("list_images").Observe(SinceInMicroseconds(start)) DockerOperationsLatency.WithLabelValues("list_images").Observe(SinceInMicroseconds(start))
}() }()
return self.client.ListImages(opts) return in.client.ListImages(opts)
} }
func (self instrumentedDockerInterface) PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error { func (in instrumentedDockerInterface) PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error {
start := time.Now() start := time.Now()
defer func() { defer func() {
DockerOperationsLatency.WithLabelValues("pull_image").Observe(SinceInMicroseconds(start)) DockerOperationsLatency.WithLabelValues("pull_image").Observe(SinceInMicroseconds(start))
}() }()
return self.client.PullImage(opts, auth) return in.client.PullImage(opts, auth)
} }
func (self instrumentedDockerInterface) RemoveImage(image string) error { func (in instrumentedDockerInterface) RemoveImage(image string) error {
start := time.Now() start := time.Now()
defer func() { defer func() {
DockerOperationsLatency.WithLabelValues("remove_image").Observe(SinceInMicroseconds(start)) DockerOperationsLatency.WithLabelValues("remove_image").Observe(SinceInMicroseconds(start))
}() }()
return self.client.RemoveImage(image) return in.client.RemoveImage(image)
} }
func (self instrumentedDockerInterface) Logs(opts docker.LogsOptions) error { func (in instrumentedDockerInterface) Logs(opts docker.LogsOptions) error {
start := time.Now() start := time.Now()
defer func() { defer func() {
DockerOperationsLatency.WithLabelValues("logs").Observe(SinceInMicroseconds(start)) DockerOperationsLatency.WithLabelValues("logs").Observe(SinceInMicroseconds(start))
}() }()
return self.client.Logs(opts) return in.client.Logs(opts)
} }
func (self instrumentedDockerInterface) Version() (*docker.Env, error) { func (in instrumentedDockerInterface) Version() (*docker.Env, error) {
start := time.Now() start := time.Now()
defer func() { defer func() {
DockerOperationsLatency.WithLabelValues("version").Observe(SinceInMicroseconds(start)) DockerOperationsLatency.WithLabelValues("version").Observe(SinceInMicroseconds(start))
}() }()
return self.client.Version() return in.client.Version()
} }
func (self instrumentedDockerInterface) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) { func (in instrumentedDockerInterface) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) {
start := time.Now() start := time.Now()
defer func() { defer func() {
DockerOperationsLatency.WithLabelValues("create_exec").Observe(SinceInMicroseconds(start)) DockerOperationsLatency.WithLabelValues("create_exec").Observe(SinceInMicroseconds(start))
}() }()
return self.client.CreateExec(opts) return in.client.CreateExec(opts)
} }
func (self instrumentedDockerInterface) StartExec(startExec string, opts docker.StartExecOptions) error { func (in instrumentedDockerInterface) StartExec(startExec string, opts docker.StartExecOptions) error {
start := time.Now() start := time.Now()
defer func() { defer func() {
DockerOperationsLatency.WithLabelValues("start_exec").Observe(SinceInMicroseconds(start)) DockerOperationsLatency.WithLabelValues("start_exec").Observe(SinceInMicroseconds(start))
}() }()
return self.client.StartExec(startExec, opts) return in.client.StartExec(startExec, opts)
} }

View File

@ -90,8 +90,8 @@ const (
SyncPodSync SyncPodSync
) )
func (self SyncPodType) String() string { func (sp SyncPodType) String() string {
switch self { switch sp {
case SyncPodCreate: case SyncPodCreate:
return "create" return "create"
case SyncPodUpdate: case SyncPodUpdate:
@ -132,13 +132,13 @@ var (
nil, nil) nil, nil)
) )
func (self *podAndContainerCollector) Describe(ch chan<- *prometheus.Desc) { func (pc *podAndContainerCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- runningPodCountDesc ch <- runningPodCountDesc
ch <- runningContainerCountDesc ch <- runningContainerCountDesc
} }
func (self *podAndContainerCollector) Collect(ch chan<- prometheus.Metric) { func (pc *podAndContainerCollector) Collect(ch chan<- prometheus.Metric) {
runningPods, err := self.containerCache.GetPods() runningPods, err := pc.containerCache.GetPods()
if err != nil { if err != nil {
glog.Warning("Failed to get running container information while collecting metrics: %v", err) glog.Warning("Failed to get running container information while collecting metrics: %v", err)
return return

View File

@ -43,19 +43,19 @@ func newBasicMirrorClient(apiserverClient client.Interface) *basicMirrorClient {
} }
// Creates a mirror pod. // Creates a mirror pod.
func (self *basicMirrorClient) CreateMirrorPod(pod api.Pod) error { func (mc *basicMirrorClient) CreateMirrorPod(pod api.Pod) error {
if self.apiserverClient == nil { if mc.apiserverClient == nil {
return nil return nil
} }
pod.Annotations[ConfigMirrorAnnotationKey] = MirrorType pod.Annotations[ConfigMirrorAnnotationKey] = MirrorType
_, err := self.apiserverClient.Pods(NamespaceDefault).Create(&pod) _, err := mc.apiserverClient.Pods(NamespaceDefault).Create(&pod)
return err return err
} }
// Deletes a mirror pod. // Deletes a mirror pod.
func (self *basicMirrorClient) DeleteMirrorPod(podFullName string) error { func (mc *basicMirrorClient) DeleteMirrorPod(podFullName string) error {
if self.apiserverClient == nil { if mc.apiserverClient == nil {
return nil return nil
} }
name, namespace, err := kubecontainer.ParsePodFullName(podFullName) name, namespace, err := kubecontainer.ParsePodFullName(podFullName)
@ -64,7 +64,7 @@ func (self *basicMirrorClient) DeleteMirrorPod(podFullName string) error {
return err return err
} }
glog.V(4).Infof("Deleting a mirror pod %q", podFullName) glog.V(4).Infof("Deleting a mirror pod %q", podFullName)
if err := self.apiserverClient.Pods(namespace).Delete(name); err != nil { if err := mc.apiserverClient.Pods(namespace).Delete(name); err != nil {
glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err) glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err)
} }
return nil return nil

View File

@ -34,20 +34,20 @@ type fakeMirrorClient struct {
deleteCounts map[string]int deleteCounts map[string]int
} }
func (self *fakeMirrorClient) CreateMirrorPod(pod api.Pod) error { func (fmc *fakeMirrorClient) CreateMirrorPod(pod api.Pod) error {
self.mirrorPodLock.Lock() fmc.mirrorPodLock.Lock()
defer self.mirrorPodLock.Unlock() defer fmc.mirrorPodLock.Unlock()
podFullName := kubecontainer.GetPodFullName(&pod) podFullName := kubecontainer.GetPodFullName(&pod)
self.mirrorPods.Insert(podFullName) fmc.mirrorPods.Insert(podFullName)
self.createCounts[podFullName]++ fmc.createCounts[podFullName]++
return nil return nil
} }
func (self *fakeMirrorClient) DeleteMirrorPod(podFullName string) error { func (fmc *fakeMirrorClient) DeleteMirrorPod(podFullName string) error {
self.mirrorPodLock.Lock() fmc.mirrorPodLock.Lock()
defer self.mirrorPodLock.Unlock() defer fmc.mirrorPodLock.Unlock()
self.mirrorPods.Delete(podFullName) fmc.mirrorPods.Delete(podFullName)
self.deleteCounts[podFullName]++ fmc.deleteCounts[podFullName]++
return nil return nil
} }
@ -59,28 +59,28 @@ func newFakeMirrorClient() *fakeMirrorClient {
return &m return &m
} }
func (self *fakeMirrorClient) HasPod(podFullName string) bool { func (fmc *fakeMirrorClient) HasPod(podFullName string) bool {
self.mirrorPodLock.RLock() fmc.mirrorPodLock.RLock()
defer self.mirrorPodLock.RUnlock() defer fmc.mirrorPodLock.RUnlock()
return self.mirrorPods.Has(podFullName) return fmc.mirrorPods.Has(podFullName)
} }
func (self *fakeMirrorClient) NumOfPods() int { func (fmc *fakeMirrorClient) NumOfPods() int {
self.mirrorPodLock.RLock() fmc.mirrorPodLock.RLock()
defer self.mirrorPodLock.RUnlock() defer fmc.mirrorPodLock.RUnlock()
return self.mirrorPods.Len() return fmc.mirrorPods.Len()
} }
func (self *fakeMirrorClient) GetPods() []string { func (fmc *fakeMirrorClient) GetPods() []string {
self.mirrorPodLock.RLock() fmc.mirrorPodLock.RLock()
defer self.mirrorPodLock.RUnlock() defer fmc.mirrorPodLock.RUnlock()
return self.mirrorPods.List() return fmc.mirrorPods.List()
} }
func (self *fakeMirrorClient) GetCounts(podFullName string) (int, int) { func (fmc *fakeMirrorClient) GetCounts(podFullName string) (int, int) {
self.mirrorPodLock.RLock() fmc.mirrorPodLock.RLock()
defer self.mirrorPodLock.RUnlock() defer fmc.mirrorPodLock.RUnlock()
return self.createCounts[podFullName], self.deleteCounts[podFullName] return fmc.createCounts[podFullName], fmc.deleteCounts[podFullName]
} }
func TestParsePodFullName(t *testing.T) { func TestParsePodFullName(t *testing.T) {

View File

@ -83,23 +83,23 @@ func newBasicPodManager(apiserverClient client.Interface) *basicPodManager {
} }
// Update the internal pods with those provided by the update. // Update the internal pods with those provided by the update.
func (self *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) { func (pm *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
self.lock.Lock() pm.lock.Lock()
defer self.lock.Unlock() defer pm.lock.Unlock()
switch u.Op { switch u.Op {
case SET: case SET:
glog.V(3).Infof("SET: Containers changed") glog.V(3).Infof("SET: Containers changed")
// Store the new pods. Don't worry about filtering host ports since those // Store the new pods. Don't worry about filtering host ports since those
// pods will never be looked up. // pods will never be looked up.
existingPods := make(map[types.UID]struct{}) existingPods := make(map[types.UID]struct{})
for uid := range self.podByUID { for uid := range pm.podByUID {
existingPods[uid] = struct{}{} existingPods[uid] = struct{}{}
} }
// Update the internal pods. // Update the internal pods.
self.setPods(u.Pods) pm.setPods(u.Pods)
for uid := range self.podByUID { for uid := range pm.podByUID {
if _, ok := existingPods[uid]; !ok { if _, ok := existingPods[uid]; !ok {
podSyncTypes[uid] = metrics.SyncPodCreate podSyncTypes[uid] = metrics.SyncPodCreate
} }
@ -112,14 +112,14 @@ func (self *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]
for i := range u.Pods { for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
} }
allPods := applyUpdates(u.Pods, self.getAllPods()) allPods := applyUpdates(u.Pods, pm.getAllPods())
self.setPods(allPods) pm.setPods(allPods)
default: default:
panic("syncLoop does not support incremental changes") panic("syncLoop does not support incremental changes")
} }
// Mark all remaining pods as sync. // Mark all remaining pods as sync.
for uid := range self.podByUID { for uid := range pm.podByUID {
if _, ok := podSyncTypes[uid]; !ok { if _, ok := podSyncTypes[uid]; !ok {
podSyncTypes[uid] = metrics.SyncPodSync podSyncTypes[uid] = metrics.SyncPodSync
} }
@ -127,13 +127,13 @@ func (self *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]
} }
// Set the internal pods based on the new pods. // Set the internal pods based on the new pods.
func (self *basicPodManager) SetPods(newPods []*api.Pod) { func (pm *basicPodManager) SetPods(newPods []*api.Pod) {
self.lock.Lock() pm.lock.Lock()
defer self.lock.Unlock() defer pm.lock.Unlock()
self.setPods(newPods) pm.setPods(newPods)
} }
func (self *basicPodManager) setPods(newPods []*api.Pod) { func (pm *basicPodManager) setPods(newPods []*api.Pod) {
podByUID := make(map[types.UID]*api.Pod) podByUID := make(map[types.UID]*api.Pod)
mirrorPodByUID := make(map[types.UID]*api.Pod) mirrorPodByUID := make(map[types.UID]*api.Pod)
podByFullName := make(map[string]*api.Pod) podByFullName := make(map[string]*api.Pod)
@ -150,10 +150,10 @@ func (self *basicPodManager) setPods(newPods []*api.Pod) {
} }
} }
self.podByUID = podByUID pm.podByUID = podByUID
self.podByFullName = podByFullName pm.podByFullName = podByFullName
self.mirrorPodByUID = mirrorPodByUID pm.mirrorPodByUID = mirrorPodByUID
self.mirrorPodByFullName = mirrorPodByFullName pm.mirrorPodByFullName = mirrorPodByFullName
} }
func applyUpdates(changed []*api.Pod, current []*api.Pod) []*api.Pod { func applyUpdates(changed []*api.Pod, current []*api.Pod) []*api.Pod {
@ -177,42 +177,42 @@ func applyUpdates(changed []*api.Pod, current []*api.Pod) []*api.Pod {
} }
// GetPods returns the regular pods bound to the kubelet and their spec. // GetPods returns the regular pods bound to the kubelet and their spec.
func (self *basicPodManager) GetPods() []*api.Pod { func (pm *basicPodManager) GetPods() []*api.Pod {
self.lock.RLock() pm.lock.RLock()
defer self.lock.RUnlock() defer pm.lock.RUnlock()
return podsMapToPods(self.podByUID) return podsMapToPods(pm.podByUID)
} }
// Returns all pods (including mirror pods). // Returns all pods (including mirror pods).
func (self *basicPodManager) getAllPods() []*api.Pod { func (pm *basicPodManager) getAllPods() []*api.Pod {
return append(podsMapToPods(self.podByUID), podsMapToPods(self.mirrorPodByUID)...) return append(podsMapToPods(pm.podByUID), podsMapToPods(pm.mirrorPodByUID)...)
} }
// GetPodsAndMirrorMap returns the a copy of the regular pods and the mirror // GetPodsAndMirrorMap returns the a copy of the regular pods and the mirror
// pods indexed by full name. // pods indexed by full name.
func (self *basicPodManager) GetPodsAndMirrorMap() ([]*api.Pod, map[string]api.Pod) { func (pm *basicPodManager) GetPodsAndMirrorMap() ([]*api.Pod, map[string]api.Pod) {
self.lock.RLock() pm.lock.RLock()
defer self.lock.RUnlock() defer pm.lock.RUnlock()
mirrorPods := make(map[string]api.Pod) mirrorPods := make(map[string]api.Pod)
for key, pod := range self.mirrorPodByFullName { for key, pod := range pm.mirrorPodByFullName {
mirrorPods[key] = *pod mirrorPods[key] = *pod
} }
return podsMapToPods(self.podByUID), mirrorPods return podsMapToPods(pm.podByUID), mirrorPods
} }
// GetPodByName provides the (non-mirror) pod that matches namespace and name, // GetPodByName provides the (non-mirror) pod that matches namespace and name,
// as well as whether the pod was found. // as well as whether the pod was found.
func (self *basicPodManager) GetPodByName(namespace, name string) (*api.Pod, bool) { func (pm *basicPodManager) GetPodByName(namespace, name string) (*api.Pod, bool) {
podFullName := kubecontainer.BuildPodFullName(name, namespace) podFullName := kubecontainer.BuildPodFullName(name, namespace)
return self.GetPodByFullName(podFullName) return pm.GetPodByFullName(podFullName)
} }
// GetPodByName returns the (non-mirror) pod that matches full name, as well as // GetPodByName returns the (non-mirror) pod that matches full name, as well as
// whether the pod was found. // whether the pod was found.
func (self *basicPodManager) GetPodByFullName(podFullName string) (*api.Pod, bool) { func (pm *basicPodManager) GetPodByFullName(podFullName string) (*api.Pod, bool) {
self.lock.RLock() pm.lock.RLock()
defer self.lock.RUnlock() defer pm.lock.RUnlock()
pod, ok := self.podByFullName[podFullName] pod, ok := pm.podByFullName[podFullName]
return pod, ok return pod, ok
} }
@ -220,28 +220,28 @@ func (self *basicPodManager) GetPodByFullName(podFullName string) (*api.Pod, boo
// Otherwise, return the original UID. All public-facing functions should // Otherwise, return the original UID. All public-facing functions should
// perform this translation for UIDs because user may provide a mirror pod UID, // perform this translation for UIDs because user may provide a mirror pod UID,
// which is not recognized by internal Kubelet functions. // which is not recognized by internal Kubelet functions.
func (self *basicPodManager) TranslatePodUID(uid types.UID) types.UID { func (pm *basicPodManager) TranslatePodUID(uid types.UID) types.UID {
if uid == "" { if uid == "" {
return uid return uid
} }
self.lock.RLock() pm.lock.RLock()
defer self.lock.RUnlock() defer pm.lock.RUnlock()
if mirrorPod, ok := self.mirrorPodByUID[uid]; ok { if mirrorPod, ok := pm.mirrorPodByUID[uid]; ok {
podFullName := kubecontainer.GetPodFullName(mirrorPod) podFullName := kubecontainer.GetPodFullName(mirrorPod)
if pod, ok := self.podByFullName[podFullName]; ok { if pod, ok := pm.podByFullName[podFullName]; ok {
return pod.UID return pod.UID
} }
} }
return uid return uid
} }
func (self *basicPodManager) getOrphanedMirrorPodNames() []string { func (pm *basicPodManager) getOrphanedMirrorPodNames() []string {
self.lock.RLock() pm.lock.RLock()
defer self.lock.RUnlock() defer pm.lock.RUnlock()
var podFullNames []string var podFullNames []string
for podFullName := range self.mirrorPodByFullName { for podFullName := range pm.mirrorPodByFullName {
if _, ok := self.podByFullName[podFullName]; !ok { if _, ok := pm.podByFullName[podFullName]; !ok {
podFullNames = append(podFullNames, podFullName) podFullNames = append(podFullNames, podFullName)
} }
} }
@ -251,15 +251,15 @@ func (self *basicPodManager) getOrphanedMirrorPodNames() []string {
// Delete all mirror pods which do not have associated static pods. This method // Delete all mirror pods which do not have associated static pods. This method
// sends deletion requets to the API server, but does NOT modify the internal // sends deletion requets to the API server, but does NOT modify the internal
// pod storage in basicPodManager. // pod storage in basicPodManager.
func (self *basicPodManager) DeleteOrphanedMirrorPods() { func (pm *basicPodManager) DeleteOrphanedMirrorPods() {
podFullNames := self.getOrphanedMirrorPodNames() podFullNames := pm.getOrphanedMirrorPodNames()
for _, podFullName := range podFullNames { for _, podFullName := range podFullNames {
self.mirrorClient.DeleteMirrorPod(podFullName) pm.mirrorClient.DeleteMirrorPod(podFullName)
} }
} }
// Returns true if mirrorPod is a correct representation of pod; false otherwise. // Returns true if mirrorPod is a correct representation of pod; false otherwise.
func (self *basicPodManager) IsMirrorPodOf(mirrorPod, pod *api.Pod) bool { func (pm *basicPodManager) IsMirrorPodOf(mirrorPod, pod *api.Pod) bool {
// Check name and namespace first. // Check name and namespace first.
if pod.Name != mirrorPod.Name || pod.Namespace != mirrorPod.Namespace { if pod.Name != mirrorPod.Name || pod.Namespace != mirrorPod.Namespace {
return false return false