From 18b728ff44a450a922a584b5342e1294124d6840 Mon Sep 17 00:00:00 2001 From: Filip Grzadkowski Date: Tue, 17 Mar 2015 13:51:45 +0100 Subject: [PATCH] Revert "Periodically update pod status from kubelet." --- cmd/integration/integration.go | 2 +- pkg/api/validation/validation.go | 1 - pkg/client/fake_pods.go | 5 -- pkg/client/pods.go | 21 ++---- pkg/kubelet/config/apiserver.go | 2 +- pkg/kubelet/dockertools/fake_docker_client.go | 1 - pkg/kubelet/kubelet.go | 67 +++---------------- pkg/kubelet/kubelet_test.go | 22 +++--- pkg/kubelet/pod_workers.go | 2 +- pkg/kubelet/types.go | 8 --- pkg/master/master.go | 21 +++--- 11 files changed, 37 insertions(+), 115 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 575ce9c03ad..8af00379004 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -657,7 +657,7 @@ func runServiceTest(client *client.Client) { glog.Fatalf("Failed to create service: %v, %v", svc1, err) } - // create an identical service in the non-default namespace + // create an identical service in the default namespace svc3 := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "service1"}, Spec: api.ServiceSpec{ diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index 34336b29bb1..9bbbf9ba488 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -691,7 +691,6 @@ func ValidatePodStatusUpdate(newPod, oldPod *api.Pod) errs.ValidationErrorList { allErrs = append(allErrs, errs.NewFieldInvalid("status.host", newPod.Status.Host, "pod host cannot be changed directly")) } - // For status update we ignore changes to pod spec. newPod.Spec = oldPod.Spec return allErrs diff --git a/pkg/client/fake_pods.go b/pkg/client/fake_pods.go index 1b3eecdb707..d39491211af 100644 --- a/pkg/client/fake_pods.go +++ b/pkg/client/fake_pods.go @@ -63,8 +63,3 @@ func (c *FakePods) Bind(bind *api.Binding) error { c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "bind-pod", Value: bind.Name}) return nil } - -func (c *FakePods) UpdateStatus(name string, status *api.PodStatus) (*api.Pod, error) { - c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "update-status-pod", Value: name}) - return &api.Pod{}, nil -} diff --git a/pkg/client/pods.go b/pkg/client/pods.go index 424a2feab5c..06fdfd78aa3 100644 --- a/pkg/client/pods.go +++ b/pkg/client/pods.go @@ -39,7 +39,6 @@ type PodInterface interface { Update(pod *api.Pod) (*api.Pod, error) Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) Bind(binding *api.Binding) error - UpdateStatus(name string, status *api.PodStatus) (*api.Pod, error) } // pods implements PodsNamespacer interface @@ -63,7 +62,7 @@ func (c *pods) List(selector labels.Selector) (result *api.PodList, err error) { return } -// Get takes the name of the pod, and returns the corresponding Pod object, and an error if it occurs +// GetPod takes the name of the pod, and returns the corresponding Pod object, and an error if it occurs func (c *pods) Get(name string) (result *api.Pod, err error) { if len(name) == 0 { return nil, errors.New("name is required parameter to Get") @@ -74,19 +73,19 @@ func (c *pods) Get(name string) (result *api.Pod, err error) { return } -// Delete takes the name of the pod, and returns an error if one occurs +// DeletePod takes the name of the pod, and returns an error if one occurs func (c *pods) Delete(name string) error { return c.r.Delete().Namespace(c.ns).Resource("pods").Name(name).Do().Error() } -// Create takes the representation of a pod. Returns the server's representation of the pod, and an error, if it occurs. +// CreatePod takes the representation of a pod. Returns the server's representation of the pod, and an error, if it occurs. func (c *pods) Create(pod *api.Pod) (result *api.Pod, err error) { result = &api.Pod{} err = c.r.Post().Namespace(c.ns).Resource("pods").Body(pod).Do().Into(result) return } -// Update takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs. +// UpdatePod takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs. func (c *pods) Update(pod *api.Pod) (result *api.Pod, err error) { result = &api.Pod{} if len(pod.ResourceVersion) == 0 { @@ -113,15 +112,3 @@ func (c *pods) Watch(label, field labels.Selector, resourceVersion string) (watc func (c *pods) Bind(binding *api.Binding) error { return c.r.Post().Namespace(c.ns).Resource("pods").Name(binding.Name).SubResource("binding").Body(binding).Do().Error() } - -// UpdateStatus takes the name of the pod and the new status. Returns the server's representation of the pod, and an error, if it occurs. -func (c *pods) UpdateStatus(name string, newStatus *api.PodStatus) (result *api.Pod, err error) { - result = &api.Pod{} - pod, err := c.Get(name) - if err != nil { - return - } - pod.Status = *newStatus - err = c.r.Put().Namespace(c.ns).Resource("pods").Name(pod.Name).SubResource("status").Body(pod).Do().Into(result) - return -} diff --git a/pkg/kubelet/config/apiserver.go b/pkg/kubelet/config/apiserver.go index 9dd4440915d..418b8b14b43 100644 --- a/pkg/kubelet/config/apiserver.go +++ b/pkg/kubelet/config/apiserver.go @@ -31,7 +31,7 @@ func NewSourceApiserver(client *client.Client, hostname string, updates chan<- i newSourceApiserverFromLW(lw, updates) } -// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver. +// newSourceApiserverFromLW holds creates a config source that watches an pulls from the apiserver. func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) { send := func(objs []interface{}) { var pods []api.Pod diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index 35da4c01527..01d14cb2503 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -126,7 +126,6 @@ func (f *FakeDockerClient) StartContainer(id string, hostConfig *docker.HostConf Running: true, Pid: 42, }, - NetworkSettings: &docker.NetworkSettings{IPAddress: "1.2.3.4"}, } return f.Err } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 29de7d9515f..dc60bb880f9 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -285,7 +285,7 @@ type Kubelet struct { // the EventRecorder to use recorder record.EventRecorder - // A pod status cache stores statuses for pods (both rejected and synced). + // A pod status cache currently used to store rejected pods and their statuses. podStatusesLock sync.RWMutex podStatuses map[string]api.PodStatus } @@ -568,7 +568,6 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { glog.Warning("No api server defined - no node status update will be sent.") } go kl.syncNodeStatus() - go util.Forever(kl.syncStatus, kl.resyncInterval) kl.syncLoop(updates, kl) } @@ -1347,17 +1346,6 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, containersInPod dock func (kl *Kubelet) syncPod(pod *api.Pod, containersInPod dockertools.DockerContainers) error { podFullName := GetPodFullName(pod) uid := pod.UID - - // Before returning, regenerate status and store it in the cache. - defer func() { - status, err := kl.generatePodStatus(podFullName, uid) - if err != nil { - glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err) - } else { - kl.setPodStatusInCache(podFullName, status) - } - }() - containerChanges, err := kl.computePodContainerChanges(pod, containersInPod) glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges) if err != nil { @@ -1727,40 +1715,6 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { } } -// syncStatus syncs pods statuses with the apiserver. -func (kl *Kubelet) syncStatus() { - glog.V(3).Infof("Syncing pods status") - - statuses := make(map[string]api.PodStatus) - func() { - kl.podLock.Lock() - defer kl.podLock.Unlock() - for _, pod := range kl.pods { - source := pod.Annotations[ConfigSourceAnnotationKey] - if source != ApiserverSource { - glog.V(3).Infof("Pod status for %q is not updated due to its source %s", pod.Name, source) - continue - } - status, err := kl.GetPodStatus(GetPodFullName(&pod), pod.UID) - if err != nil { - glog.Warningf("Error getting pod %q status: %v, retry later", pod.Name, err) - continue - } - statuses[GetPodFullName(&pod)] = status - } - }() - - for podFullName, status := range statuses { - name, namespace := ParsePodFullName(podFullName) - pod, err := kl.kubeClient.Pods(namespace).UpdateStatus(name, &status) - if err != nil { - glog.Warningf("Error updating status for pod %s: %v (full pod: %s)", name, err, pod) - } else { - glog.V(3).Infof("Status for pod %q updated successfully: %s", name, pod) - } - } -} - // Updated the Kubelet's internal pods with those provided by the update. // Records new and updated pods in newPods and updatedPods. func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) { @@ -2046,23 +2000,19 @@ func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.PodSpec, bool) { // GetPodStatus returns information from Docker about the containers in a pod func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) { - // Check to see if we have a cached version of the status. - cachedPodStatus, found := kl.getPodStatusFromCache(podFullName) - if found { - glog.V(3).Infof("Returning cached status for %s", podFullName) - return cachedPodStatus, nil - } - return kl.generatePodStatus(podFullName, uid) -} - -func (kl *Kubelet) generatePodStatus(podFullName string, uid types.UID) (api.PodStatus, error) { - glog.V(3).Infof("Generating status for %s", podFullName) var podStatus api.PodStatus spec, found := kl.GetPodByFullName(podFullName) + if !found { return podStatus, fmt.Errorf("Couldn't find spec for pod %s", podFullName) } + // Check to see if the pod has been rejected. + mappedPodStatus, ok := kl.getPodStatusFromCache(podFullName) + if ok { + return mappedPodStatus, nil + } + info, err := dockertools.GetDockerPodInfo(kl.dockerClient, *spec, podFullName, uid) if err != nil { @@ -2092,7 +2042,6 @@ func (kl *Kubelet) generatePodStatus(podFullName string, uid types.UID) (api.Pod if found { podStatus.PodIP = netContainerInfo.PodIP } - podStatus.Host = kl.hostname return podStatus, nil } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 93f52f6b208..b9414f4be9f 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -447,7 +447,7 @@ func TestSyncPodsDoesNothing(t *testing.T) { t.Errorf("unexpected error: %v", err) } waitGroup.Wait() - verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container", "list", "inspect_container", "inspect_container"}) + verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container"}) } func TestSyncPodsWithTerminationLog(t *testing.T) { @@ -481,7 +481,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { } waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"}) + "list", "list", "list", "create", "start", "inspect_container", "create", "start"}) fakeDocker.Lock() parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":") @@ -531,7 +531,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"}) + "list", "list", "list", "create", "start", "inspect_container", "create", "start"}) fakeDocker.Lock() @@ -584,7 +584,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"}) + "list", "list", "list", "create", "start", "inspect_container", "create", "start"}) fakeDocker.Lock() @@ -634,7 +634,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start", "list", "inspect_container", "inspect_container"}) + "list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) fakeDocker.Lock() if len(fakeDocker.Created) != 1 || @@ -691,7 +691,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start", "list", "inspect_container", "inspect_container"}) + "list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) fakeDocker.Lock() if len(fakeDocker.Created) != 1 || @@ -760,7 +760,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "list", "inspect_container", "inspect_container", "list", "inspect_container", "inspect_container", "stop", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"}) + "list", "list", "list", "list", "inspect_container", "inspect_container", "stop", "create", "start", "inspect_container", "create", "start"}) // A map iteration is used to delete containers, so must not depend on // order here. @@ -898,7 +898,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) { t.Errorf("unexpected error: %v", err) } - verifyCalls(t, fakeDocker, []string{"list", "stop", "list"}) + verifyCalls(t, fakeDocker, []string{"list", "stop"}) // Expect one of the duplicates to be killed. if len(fakeDocker.Stopped) != 1 || (fakeDocker.Stopped[0] != "1234" && fakeDocker.Stopped[0] != "4567") { t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped) @@ -940,7 +940,7 @@ func TestSyncPodBadHash(t *testing.T) { } //verifyCalls(t, fakeDocker, []string{"list", "stop", "list", "create", "start", "stop", "create", "start", "inspect_container"}) - verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"}) + verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "create", "start", "inspect_container", "create", "start"}) // A map interation is used to delete containers, so must not depend on // order here. @@ -993,7 +993,7 @@ func TestSyncPodUnhealthy(t *testing.T) { t.Errorf("unexpected error: %v", err) } - verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start", "list", "inspect_container"}) + verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start"}) // A map interation is used to delete containers, so must not depend on // order here. @@ -1683,7 +1683,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) { t.Errorf("unexpected error: %v", err) } - verifyCalls(t, fakeDocker, []string{"list", "list", "create", "start", "stop", "list"}) + verifyCalls(t, fakeDocker, []string{"list", "list", "create", "start", "stop"}) if len(fakeDocker.Stopped) != 1 { t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped) diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 3f0810f333d..e0c169099ab 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -31,7 +31,7 @@ import ( type syncPodFnType func(*api.Pod, dockertools.DockerContainers) error type podWorkers struct { - // Protects all per worker fields. + // Protects podUpdates field. podLock sync.Mutex // Tracks all running per-pod goroutines - per-pod goroutine will be diff --git a/pkg/kubelet/types.go b/pkg/kubelet/types.go index f15d1320bdc..8065c08c41c 100644 --- a/pkg/kubelet/types.go +++ b/pkg/kubelet/types.go @@ -18,7 +18,6 @@ package kubelet import ( "fmt" - "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) @@ -69,19 +68,12 @@ type PodUpdate struct { } // GetPodFullName returns a name that uniquely identifies a pod across all config sources. -// NOTE: If changed ParsePodFullName must be also updated. func GetPodFullName(pod *api.Pod) string { // Use underscore as the delimiter because it is not allowed in pod name // (DNS subdomain format), while allowed in the container name format. return fmt.Sprintf("%s_%s", pod.Name, pod.Namespace) } -// ParsePodFullName parses full name generated by GetPodFullName and returns parts of it. -func ParsePodFullName(podFullName string) (name, namespace string) { - nameParts := strings.Split(podFullName, "_") - return nameParts[0], nameParts[1] -} - // Build the pod full name from pod name and namespace. func BuildPodFullName(name, namespace string) string { return name + "_" + namespace diff --git a/pkg/master/master.go b/pkg/master/master.go index 473613d73fc..31046ec65f6 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -371,19 +371,20 @@ func (m *Master) init(c *Config) { m.nodeRegistry = registry nodeStorage := minion.NewREST(m.nodeRegistry) + // TODO: unify the storage -> registry and storage -> client patterns + nodeStorageClient := RESTStorageToNodes(nodeStorage) + podCache := NewPodCache( + c.KubeletClient, + nodeStorageClient.Nodes(), + podRegistry, + ) if c.SyncPodStatus { - // TODO: unify the storage -> registry and storage -> client patterns - nodeStorageClient := RESTStorageToNodes(nodeStorage) - - podCache := NewPodCache( - c.KubeletClient, - nodeStorageClient.Nodes(), - podRegistry, - ) go util.Forever(func() { podCache.UpdateAllContainers() }, m.cacheTimeout) - go util.Forever(func() { podCache.GarbageCollectPodStatus() }, time.Minute*30) - podStorage = podStorage.WithPodStatus(podCache) } + go util.Forever(func() { podCache.GarbageCollectPodStatus() }, time.Minute*30) + + // TODO: refactor podCache to sit on top of podStorage via status calls + podStorage = podStorage.WithPodStatus(podCache) // TODO: Factor out the core API registration m.storage = map[string]apiserver.RESTStorage{