mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Periodically update pod status from kubelet.
This commit is contained in:
parent
faab509a23
commit
1a352b74ba
@ -716,7 +716,7 @@ func runServiceTest(client *client.Client) {
|
||||
glog.Fatalf("Failed to create service: %v, %v", svc1, err)
|
||||
}
|
||||
|
||||
// create an identical service in the default namespace
|
||||
// create an identical service in the non-default namespace
|
||||
svc3 := &api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "service1"},
|
||||
Spec: api.ServiceSpec{
|
||||
|
@ -691,6 +691,7 @@ func ValidatePodStatusUpdate(newPod, oldPod *api.Pod) errs.ValidationErrorList {
|
||||
allErrs = append(allErrs, errs.NewFieldInvalid("status.host", newPod.Status.Host, "pod host cannot be changed directly"))
|
||||
}
|
||||
|
||||
// For status update we ignore changes to pod spec.
|
||||
newPod.Spec = oldPod.Spec
|
||||
|
||||
return allErrs
|
||||
|
@ -64,3 +64,8 @@ func (c *FakePods) Bind(bind *api.Binding) error {
|
||||
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "bind-pod", Value: bind.Name})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *FakePods) UpdateStatus(name string, status *api.PodStatus) (*api.Pod, error) {
|
||||
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "update-status-pod", Value: name})
|
||||
return &api.Pod{}, nil
|
||||
}
|
||||
|
@ -40,6 +40,7 @@ type PodInterface interface {
|
||||
Update(pod *api.Pod) (*api.Pod, error)
|
||||
Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error)
|
||||
Bind(binding *api.Binding) error
|
||||
UpdateStatus(name string, status *api.PodStatus) (*api.Pod, error)
|
||||
}
|
||||
|
||||
// pods implements PodsNamespacer interface
|
||||
@ -63,7 +64,7 @@ func (c *pods) List(selector labels.Selector) (result *api.PodList, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// GetPod takes the name of the pod, and returns the corresponding Pod object, and an error if it occurs
|
||||
// Get takes the name of the pod, and returns the corresponding Pod object, and an error if it occurs
|
||||
func (c *pods) Get(name string) (result *api.Pod, err error) {
|
||||
if len(name) == 0 {
|
||||
return nil, errors.New("name is required parameter to Get")
|
||||
@ -74,19 +75,19 @@ func (c *pods) Get(name string) (result *api.Pod, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// DeletePod takes the name of the pod, and returns an error if one occurs
|
||||
// Delete takes the name of the pod, and returns an error if one occurs
|
||||
func (c *pods) Delete(name string) error {
|
||||
return c.r.Delete().Namespace(c.ns).Resource("pods").Name(name).Do().Error()
|
||||
}
|
||||
|
||||
// CreatePod takes the representation of a pod. Returns the server's representation of the pod, and an error, if it occurs.
|
||||
// Create takes the representation of a pod. Returns the server's representation of the pod, and an error, if it occurs.
|
||||
func (c *pods) Create(pod *api.Pod) (result *api.Pod, err error) {
|
||||
result = &api.Pod{}
|
||||
err = c.r.Post().Namespace(c.ns).Resource("pods").Body(pod).Do().Into(result)
|
||||
return
|
||||
}
|
||||
|
||||
// UpdatePod takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs.
|
||||
// Update takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs.
|
||||
func (c *pods) Update(pod *api.Pod) (result *api.Pod, err error) {
|
||||
result = &api.Pod{}
|
||||
if len(pod.ResourceVersion) == 0 {
|
||||
@ -113,3 +114,15 @@ func (c *pods) Watch(label labels.Selector, field fields.Selector, resourceVersi
|
||||
func (c *pods) Bind(binding *api.Binding) error {
|
||||
return c.r.Post().Namespace(c.ns).Resource("pods").Name(binding.Name).SubResource("binding").Body(binding).Do().Error()
|
||||
}
|
||||
|
||||
// UpdateStatus takes the name of the pod and the new status. Returns the server's representation of the pod, and an error, if it occurs.
|
||||
func (c *pods) UpdateStatus(name string, newStatus *api.PodStatus) (result *api.Pod, err error) {
|
||||
result = &api.Pod{}
|
||||
pod, err := c.Get(name)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
pod.Status = *newStatus
|
||||
err = c.r.Put().Namespace(c.ns).Resource("pods").Name(pod.Name).SubResource("status").Body(pod).Do().Into(result)
|
||||
return
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ func NewSourceApiserver(client *client.Client, hostname string, updates chan<- i
|
||||
newSourceApiserverFromLW(lw, updates)
|
||||
}
|
||||
|
||||
// newSourceApiserverFromLW holds creates a config source that watches an pulls from the apiserver.
|
||||
// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
|
||||
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
|
||||
send := func(objs []interface{}) {
|
||||
var pods []api.Pod
|
||||
|
@ -126,6 +126,7 @@ func (f *FakeDockerClient) StartContainer(id string, hostConfig *docker.HostConf
|
||||
Running: true,
|
||||
Pid: 42,
|
||||
},
|
||||
NetworkSettings: &docker.NetworkSettings{IPAddress: "1.2.3.4"},
|
||||
}
|
||||
return f.Err
|
||||
}
|
||||
|
@ -293,7 +293,7 @@ type Kubelet struct {
|
||||
// the EventRecorder to use
|
||||
recorder record.EventRecorder
|
||||
|
||||
// A pod status cache currently used to store rejected pods and their statuses.
|
||||
// A pod status cache stores statuses for pods (both rejected and synced).
|
||||
podStatusesLock sync.RWMutex
|
||||
podStatuses map[string]api.PodStatus
|
||||
|
||||
@ -487,6 +487,7 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
|
||||
glog.Warning("No api server defined - no node status update will be sent.")
|
||||
}
|
||||
go kl.syncNodeStatus()
|
||||
go util.Forever(kl.syncStatus, kl.resyncInterval)
|
||||
kl.syncLoop(updates, kl)
|
||||
}
|
||||
|
||||
@ -1265,6 +1266,17 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, c
|
||||
func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dockertools.DockerContainers) error {
|
||||
podFullName := GetPodFullName(pod)
|
||||
uid := pod.UID
|
||||
|
||||
// Before returning, regenerate status and store it in the cache.
|
||||
defer func() {
|
||||
status, err := kl.generatePodStatus(podFullName, uid)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
|
||||
} else {
|
||||
kl.setPodStatusInCache(podFullName, status)
|
||||
}
|
||||
}()
|
||||
|
||||
containerChanges, err := kl.computePodContainerChanges(pod, hasMirrorPod, containersInPod)
|
||||
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
|
||||
if err != nil {
|
||||
@ -1633,17 +1645,33 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
|
||||
}
|
||||
}
|
||||
|
||||
pods, mirrorPods, err := kl.GetPods()
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to get bound pods.")
|
||||
return
|
||||
}
|
||||
pods, mirrorPods := kl.GetPods()
|
||||
if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil {
|
||||
glog.Errorf("Couldn't sync containers: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// syncStatus syncs pods statuses with the apiserver.
|
||||
func (kl *Kubelet) syncStatus() {
|
||||
glog.V(3).Infof("Syncing pods status")
|
||||
|
||||
pods, _ := kl.GetPods()
|
||||
for _, pod := range pods {
|
||||
status, err := kl.GetPodStatus(GetPodFullName(&pod), pod.UID)
|
||||
if err != nil {
|
||||
glog.Warningf("Error getting pod %q status: %v, retry later", pod.Name, err)
|
||||
continue
|
||||
}
|
||||
_, err = kl.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status)
|
||||
if err != nil {
|
||||
glog.Warningf("Error updating status for pod %s: %v (full pod: %s)", pod.Name, err, pod)
|
||||
} else {
|
||||
glog.V(3).Infof("Status for pod %q updated successfully: %s", pod.Name, pod)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update the Kubelet's internal pods with those provided by the update.
|
||||
// Records new and updated pods in newPods and updatedPods.
|
||||
func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
|
||||
@ -1753,10 +1781,10 @@ func (kl *Kubelet) GetHostname() string {
|
||||
|
||||
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
|
||||
// pod map.
|
||||
func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet, error) {
|
||||
func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet) {
|
||||
kl.podLock.RLock()
|
||||
defer kl.podLock.RUnlock()
|
||||
return append([]api.Pod{}, kl.pods...), kl.mirrorPods, nil
|
||||
return append([]api.Pod{}, kl.pods...), kl.mirrorPods
|
||||
}
|
||||
|
||||
// GetPodByName provides the first pod that matches namespace and name, as well as whether the node was found.
|
||||
@ -1934,19 +1962,23 @@ func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.PodSpec, bool) {
|
||||
|
||||
// GetPodStatus returns information from Docker about the containers in a pod
|
||||
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
|
||||
// Check to see if we have a cached version of the status.
|
||||
cachedPodStatus, found := kl.getPodStatusFromCache(podFullName)
|
||||
if found {
|
||||
glog.V(3).Infof("Returning cached status for %s", podFullName)
|
||||
return cachedPodStatus, nil
|
||||
}
|
||||
return kl.generatePodStatus(podFullName, uid)
|
||||
}
|
||||
|
||||
func (kl *Kubelet) generatePodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
|
||||
glog.V(3).Infof("Generating status for %s", podFullName)
|
||||
var podStatus api.PodStatus
|
||||
spec, found := kl.GetPodByFullName(podFullName)
|
||||
|
||||
if !found {
|
||||
return podStatus, fmt.Errorf("Couldn't find spec for pod %s", podFullName)
|
||||
}
|
||||
|
||||
// Check to see if the pod has been rejected.
|
||||
mappedPodStatus, ok := kl.getPodStatusFromCache(podFullName)
|
||||
if ok {
|
||||
return mappedPodStatus, nil
|
||||
}
|
||||
|
||||
info, err := dockertools.GetDockerPodInfo(kl.dockerClient, *spec, podFullName, uid)
|
||||
|
||||
if err != nil {
|
||||
@ -1976,6 +2008,7 @@ func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatu
|
||||
if found {
|
||||
podStatus.PodIP = netContainerInfo.PodIP
|
||||
}
|
||||
podStatus.Host = kl.hostname
|
||||
|
||||
return podStatus, nil
|
||||
}
|
||||
|
@ -449,7 +449,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container"})
|
||||
verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container", "list", "inspect_container", "inspect_container"})
|
||||
}
|
||||
|
||||
func TestSyncPodsWithTerminationLog(t *testing.T) {
|
||||
@ -483,7 +483,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
|
||||
}
|
||||
waitGroup.Wait()
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list", "create", "start", "inspect_container", "create", "start"})
|
||||
"list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})
|
||||
|
||||
fakeDocker.Lock()
|
||||
parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":")
|
||||
@ -533,7 +533,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list", "create", "start", "inspect_container", "create", "start"})
|
||||
"list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})
|
||||
|
||||
fakeDocker.Lock()
|
||||
|
||||
@ -586,7 +586,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list", "create", "start", "inspect_container", "create", "start"})
|
||||
"list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})
|
||||
|
||||
fakeDocker.Lock()
|
||||
|
||||
@ -636,7 +636,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"})
|
||||
"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start", "list", "inspect_container", "inspect_container"})
|
||||
|
||||
fakeDocker.Lock()
|
||||
if len(fakeDocker.Created) != 1 ||
|
||||
@ -693,7 +693,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"})
|
||||
"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start", "list", "inspect_container", "inspect_container"})
|
||||
|
||||
fakeDocker.Lock()
|
||||
if len(fakeDocker.Created) != 1 ||
|
||||
@ -762,7 +762,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list", "list", "inspect_container", "inspect_container", "stop", "create", "start", "inspect_container", "create", "start"})
|
||||
"list", "list", "list", "list", "inspect_container", "inspect_container", "list", "inspect_container", "inspect_container", "stop", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})
|
||||
|
||||
// A map iteration is used to delete containers, so must not depend on
|
||||
// order here.
|
||||
@ -900,7 +900,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{"list", "stop"})
|
||||
verifyCalls(t, fakeDocker, []string{"list", "stop", "list"})
|
||||
// Expect one of the duplicates to be killed.
|
||||
if len(fakeDocker.Stopped) != 1 || (fakeDocker.Stopped[0] != "1234" && fakeDocker.Stopped[0] != "4567") {
|
||||
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
|
||||
@ -942,7 +942,7 @@ func TestSyncPodBadHash(t *testing.T) {
|
||||
}
|
||||
|
||||
//verifyCalls(t, fakeDocker, []string{"list", "stop", "list", "create", "start", "stop", "create", "start", "inspect_container"})
|
||||
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "create", "start", "inspect_container", "create", "start"})
|
||||
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})
|
||||
|
||||
// A map interation is used to delete containers, so must not depend on
|
||||
// order here.
|
||||
@ -995,7 +995,7 @@ func TestSyncPodUnhealthy(t *testing.T) {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start"})
|
||||
verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start", "list", "inspect_container"})
|
||||
|
||||
// A map interation is used to delete containers, so must not depend on
|
||||
// order here.
|
||||
@ -1685,7 +1685,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{"list", "list", "create", "start", "stop"})
|
||||
verifyCalls(t, fakeDocker, []string{"list", "list", "create", "start", "stop", "list"})
|
||||
|
||||
if len(fakeDocker.Stopped) != 1 {
|
||||
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
|
||||
|
@ -31,7 +31,7 @@ import (
|
||||
type syncPodFnType func(*api.Pod, bool, dockertools.DockerContainers) error
|
||||
|
||||
type podWorkers struct {
|
||||
// Protects podUpdates field.
|
||||
// Protects all per worker fields.
|
||||
podLock sync.Mutex
|
||||
|
||||
// Tracks all running per-pod goroutines - per-pod goroutine will be
|
||||
|
@ -85,7 +85,7 @@ type HostInterface interface {
|
||||
GetRootInfo(req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
|
||||
GetDockerVersion() ([]uint, error)
|
||||
GetMachineInfo() (*cadvisorApi.MachineInfo, error)
|
||||
GetPods() ([]api.Pod, util.StringSet, error)
|
||||
GetPods() ([]api.Pod, util.StringSet)
|
||||
GetPodByName(namespace, name string) (*api.Pod, bool)
|
||||
GetPodStatus(name string, uid types.UID) (api.PodStatus, error)
|
||||
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
|
||||
@ -261,11 +261,7 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
|
||||
|
||||
// handlePods returns a list of pod bound to the Kubelet and their spec
|
||||
func (s *Server) handlePods(w http.ResponseWriter, req *http.Request) {
|
||||
pods, _, err := s.host.GetPods()
|
||||
if err != nil {
|
||||
s.error(w, err)
|
||||
return
|
||||
}
|
||||
pods, _ := s.host.GetPods()
|
||||
podList := &api.PodList{
|
||||
Items: pods,
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ type fakeKubelet struct {
|
||||
containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
|
||||
rootInfoFunc func(query *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
|
||||
machineInfoFunc func() (*cadvisorApi.MachineInfo, error)
|
||||
podsFunc func() ([]api.Pod, util.StringSet, error)
|
||||
podsFunc func() ([]api.Pod, util.StringSet)
|
||||
logFunc func(w http.ResponseWriter, req *http.Request)
|
||||
runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
|
||||
dockerVersionFunc func() ([]uint, error)
|
||||
@ -80,7 +80,7 @@ func (fk *fakeKubelet) GetMachineInfo() (*cadvisorApi.MachineInfo, error) {
|
||||
return fk.machineInfoFunc()
|
||||
}
|
||||
|
||||
func (fk *fakeKubelet) GetPods() ([]api.Pod, util.StringSet, error) {
|
||||
func (fk *fakeKubelet) GetPods() ([]api.Pod, util.StringSet) {
|
||||
return fk.podsFunc()
|
||||
}
|
||||
|
||||
|
@ -378,13 +378,12 @@ func (m *Master) init(c *Config) {
|
||||
nodeStorageClient.Nodes(),
|
||||
podRegistry,
|
||||
)
|
||||
if c.SyncPodStatus {
|
||||
go util.Forever(func() { podCache.UpdateAllContainers() }, m.cacheTimeout)
|
||||
}
|
||||
go util.Forever(func() { podCache.GarbageCollectPodStatus() }, time.Minute*30)
|
||||
|
||||
// TODO: refactor podCache to sit on top of podStorage via status calls
|
||||
podStorage = podStorage.WithPodStatus(podCache)
|
||||
if c.SyncPodStatus {
|
||||
go util.Forever(podCache.UpdateAllContainers, m.cacheTimeout)
|
||||
go util.Forever(podCache.GarbageCollectPodStatus, time.Minute*30)
|
||||
podStorage = podStorage.WithPodStatus(podCache)
|
||||
}
|
||||
|
||||
// TODO: Factor out the core API registration
|
||||
m.storage = map[string]apiserver.RESTStorage{
|
||||
|
Loading…
Reference in New Issue
Block a user