From 801ad909ca1104930bd468fe6c85851c5a79c578 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Tue, 17 Feb 2015 10:53:04 -0800 Subject: [PATCH] Add protection for the pods member varaible. Address comments. --- pkg/kubelet/kubelet.go | 70 +++++++++++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 18 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 353ed4009bf..7959928d73c 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -157,9 +157,14 @@ type Kubelet struct { podInfraContainerImage string podWorkers *podWorkers resyncInterval time.Duration - pods []api.BoundPod sourceReady SourceReadyFn + // Protects the pods array + // We make complete array copies out of this while locked, which is OK because once added to this array, + // pods are immutable + podLock sync.RWMutex + pods []api.BoundPod + // Needed to report events for containers belonging to deleted/modified pods. // Tracks references for reporting events dockerIDToRef map[dockertools.DockerID]*api.ObjectReference @@ -1417,6 +1422,24 @@ func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod { return filtered } +func (kl *Kubelet) handleUpdate(u PodUpdate) { + kl.podLock.Lock() + defer kl.podLock.Unlock() + switch u.Op { + case SET: + glog.V(3).Infof("SET: Containers changed") + kl.pods = u.Pods + kl.pods = filterHostPortConflicts(kl.pods) + case UPDATE: + glog.V(3).Infof("Update: Containers changed") + kl.pods = updateBoundPods(u.Pods, kl.pods) + kl.pods = filterHostPortConflicts(kl.pods) + + default: + panic("syncLoop does not support incremental changes") + } +} + // syncLoop is the main loop for processing changes. It watches for changes from // four channels (file, etcd, server, and http) and creates a union of them. For // any new change seen, will run a sync against desired state and running state. If @@ -1444,8 +1467,12 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { } } - err := handler.SyncPods(kl.pods) + pods, err := kl.GetBoundPods() if err != nil { + glog.Errorf("Failed to get bound pods.") + return + } + if err := handler.SyncPods(pods); err != nil { glog.Errorf("Couldn't sync containers: %v", err) } } @@ -1514,16 +1541,19 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail stri // GetBoundPods returns all pods bound to the kubelet and their spec func (kl *Kubelet) GetBoundPods() ([]api.BoundPod, error) { - return kl.pods, nil + kl.podLock.RLock() + defer kl.podLock.RUnlock() + return append([]api.BoundPod{}, kl.pods...), nil } -// GetPodFullName provides the first pod that matches namespace and name, or false -// if no such pod can be found. +// GetPodByName provides the first pod that matches namespace and name, as well as whether the node was found. func (kl *Kubelet) GetPodByName(namespace, name string) (*api.BoundPod, bool) { + kl.podLock.RLock() + defer kl.podLock.RUnlock() for i := range kl.pods { - pod := &kl.pods[i] + pod := kl.pods[i] if pod.Namespace == namespace && pod.Name == name { - return pod, true + return &pod, true } } return nil, false @@ -1616,23 +1646,27 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio return ready } -// GetPodStatus returns information from Docker about the containers in a pod -func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) { - var spec api.PodSpec - var podStatus api.PodStatus - found := false +func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.PodSpec, bool) { + kl.podLock.RLock() + defer kl.podLock.RUnlock() for _, pod := range kl.pods { if GetPodFullName(&pod) == podFullName { - spec = pod.Spec - found = true - break + return &pod.Spec, true } } + return nil, false +} + +// GetPodStatus returns information from Docker about the containers in a pod +func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) { + var podStatus api.PodStatus + spec, found := kl.GetPodByFullName(podFullName) + if !found { return podStatus, fmt.Errorf("Couldn't find spec for pod %s", podFullName) } - info, err := dockertools.GetDockerPodInfo(kl.dockerClient, spec, podFullName, uid) + info, err := dockertools.GetDockerPodInfo(kl.dockerClient, *spec, podFullName, uid) if err != nil { // Error handling @@ -1648,13 +1682,13 @@ func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatu } // Assume info is ready to process - podStatus.Phase = getPhase(&spec, info) + podStatus.Phase = getPhase(spec, info) for _, c := range spec.Containers { containerStatus := info[c.Name] containerStatus.Ready = kl.readiness.IsReady(containerStatus) info[c.Name] = containerStatus } - podStatus.Conditions = append(podStatus.Conditions, getPodReadyCondition(&spec, info)...) + podStatus.Conditions = append(podStatus.Conditions, getPodReadyCondition(spec, info)...) netContainerInfo, found := info[dockertools.PodInfraContainerName] if found {