Move kubelet.statusManager to status.Manager

This refactor is in preparation for moving more state handling to the
status manager. It will become the canonical cache for the latest
information on running containers and probe status, as part of the
prober refactoring.
This commit is contained in:
Tim St. Clair 2015-09-11 12:22:01 -07:00
parent 4bd638921f
commit 1f91fffb57
5 changed files with 84 additions and 58 deletions

View File

@ -52,6 +52,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/rkt" "k8s.io/kubernetes/pkg/kubelet/rkt"
"k8s.io/kubernetes/pkg/kubelet/status"
kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types" kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeletUtil "k8s.io/kubernetes/pkg/kubelet/util" kubeletUtil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
@ -244,7 +245,7 @@ func NewMainKubelet(
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to initialize disk manager: %v", err) return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
} }
statusManager := newStatusManager(kubeClient) statusManager := status.NewManager(kubeClient)
readinessManager := kubecontainer.NewReadinessManager() readinessManager := kubecontainer.NewReadinessManager()
containerRefManager := kubecontainer.NewRefManager() containerRefManager := kubecontainer.NewRefManager()
@ -503,7 +504,7 @@ type Kubelet struct {
machineInfo *cadvisorApi.MachineInfo machineInfo *cadvisorApi.MachineInfo
// Syncs pods statuses with apiserver; also used as a cache of statuses. // Syncs pods statuses with apiserver; also used as a cache of statuses.
statusManager *statusManager statusManager status.Manager
// Manager for the volume maps for the pods. // Manager for the volume maps for the pods.
volumeManager *volumeManager volumeManager *volumeManager

View File

@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/container"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
@ -105,7 +106,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.nodeLister = testNodeLister{} kubelet.nodeLister = testNodeLister{}
kubelet.readinessManager = kubecontainer.NewReadinessManager() kubelet.readinessManager = kubecontainer.NewReadinessManager()
kubelet.recorder = fakeRecorder kubelet.recorder = fakeRecorder
kubelet.statusManager = newStatusManager(fakeKubeClient) kubelet.statusManager = status.NewManager(fakeKubeClient)
if err := kubelet.setupDataDirs(); err != nil { if err := kubelet.setupDataDirs(); err != nil {
t.Fatalf("can't initialize kubelet data dirs: %v", err) t.Fatalf("can't initialize kubelet data dirs: %v", err)
} }

View File

@ -30,6 +30,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/status"
) )
type listContainersResult struct { type listContainersResult struct {
@ -83,7 +84,7 @@ func TestRunOnce(t *testing.T) {
recorder: &record.FakeRecorder{}, recorder: &record.FakeRecorder{},
cadvisor: cadvisor, cadvisor: cadvisor,
nodeLister: testNodeLister{}, nodeLister: testNodeLister{},
statusManager: newStatusManager(nil), statusManager: status.NewManager(nil),
containerRefManager: kubecontainer.NewRefManager(), containerRefManager: kubecontainer.NewRefManager(),
readinessManager: kubecontainer.NewReadinessManager(), readinessManager: kubecontainer.NewReadinessManager(),
podManager: podManager, podManager: podManager,

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package kubelet package status
import ( import (
"fmt" "fmt"
@ -39,16 +39,42 @@ type podStatusSyncRequest struct {
// Updates pod statuses in apiserver. Writes only when new status has changed. // Updates pod statuses in apiserver. Writes only when new status has changed.
// All methods are thread-safe. // All methods are thread-safe.
type statusManager struct { type manager struct {
kubeClient client.Interface kubeClient client.Interface
// Map from pod full name to sync status of the corresponding pod. // Map from pod full name to sync status of the corresponding pod.
podStatusesLock sync.RWMutex
podStatuses map[types.UID]api.PodStatus podStatuses map[types.UID]api.PodStatus
podStatusesLock sync.RWMutex
podStatusChannel chan podStatusSyncRequest podStatusChannel chan podStatusSyncRequest
} }
func newStatusManager(kubeClient client.Interface) *statusManager { // status.Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with
return &statusManager{ // the latest api.PodStatus. It also syncs updates back to the API server.
type Manager interface {
// Start the API server status sync loop.
Start()
// GetPodStatus returns the cached status for the provided pod UID, as well as whether it
// was a cache hit.
GetPodStatus(uid types.UID) (api.PodStatus, bool)
// SetPodStatus caches updates the cached status for the given pod, and triggers a status update.
SetPodStatus(pod *api.Pod, status api.PodStatus)
// TerminatePods resets the container status for the provided pods to terminated and triggers
// a status update. This function may not enqueue all the provided pods, in which case it will
// return false
TerminatePods(pods []*api.Pod) bool
// DeletePodStatus simply removes the given pod from the status cache.
DeletePodStatus(uid types.UID)
// RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in
// the provided podUIDs.
RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
}
func NewManager(kubeClient client.Interface) Manager {
return &manager{
kubeClient: kubeClient, kubeClient: kubeClient,
podStatuses: make(map[types.UID]api.PodStatus), podStatuses: make(map[types.UID]api.PodStatus),
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
@ -65,35 +91,35 @@ func isStatusEqual(oldStatus, status *api.PodStatus) bool {
return reflect.DeepEqual(status, oldStatus) return reflect.DeepEqual(status, oldStatus)
} }
func (s *statusManager) Start() { func (m *manager) Start() {
// Don't start the status manager if we don't have a client. This will happen // Don't start the status manager if we don't have a client. This will happen
// on the master, where the kubelet is responsible for bootstrapping the pods // on the master, where the kubelet is responsible for bootstrapping the pods
// of the master components. // of the master components.
if s.kubeClient == nil { if m.kubeClient == nil {
glog.Infof("Kubernetes client is nil, not starting status manager.") glog.Infof("Kubernetes client is nil, not starting status manager.")
return return
} }
// syncBatch blocks when no updates are available, we can run it in a tight loop. // syncBatch blocks when no updates are available, we can run it in a tight loop.
glog.Info("Starting to sync pod status with apiserver") glog.Info("Starting to sync pod status with apiserver")
go util.Until(func() { go util.Until(func() {
err := s.syncBatch() err := m.syncBatch()
if err != nil { if err != nil {
glog.Warningf("Failed to updated pod status: %v", err) glog.Warningf("Failed to updated pod status: %v", err)
} }
}, 0, util.NeverStop) }, 0, util.NeverStop)
} }
func (s *statusManager) GetPodStatus(uid types.UID) (api.PodStatus, bool) { func (m *manager) GetPodStatus(uid types.UID) (api.PodStatus, bool) {
s.podStatusesLock.RLock() m.podStatusesLock.RLock()
defer s.podStatusesLock.RUnlock() defer m.podStatusesLock.RUnlock()
status, ok := s.podStatuses[uid] status, ok := m.podStatuses[uid]
return status, ok return status, ok
} }
func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
s.podStatusesLock.Lock() m.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock() defer m.podStatusesLock.Unlock()
oldStatus, found := s.podStatuses[pod.UID] oldStatus, found := m.podStatuses[pod.UID]
// ensure that the start time does not change across updates. // ensure that the start time does not change across updates.
if found && oldStatus.StartTime != nil { if found && oldStatus.StartTime != nil {
@ -102,7 +128,7 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
// if the status has no start time, we need to set an initial time // if the status has no start time, we need to set an initial time
// TODO(yujuhong): Consider setting StartTime when generating the pod // TODO(yujuhong): Consider setting StartTime when generating the pod
// status instead, which would allow statusManager to become a simple cache // status instead, which would allow manager to become a simple cache
// again. // again.
if status.StartTime.IsZero() { if status.StartTime.IsZero() {
if pod.Status.StartTime.IsZero() { if pod.Status.StartTime.IsZero() {
@ -123,20 +149,17 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
// workers and/or the kubelet but dropping the lock before sending the // workers and/or the kubelet but dropping the lock before sending the
// status down the channel feels like an easy way to get a bullet in foot. // status down the channel feels like an easy way to get a bullet in foot.
if !found || !isStatusEqual(&oldStatus, &status) || pod.DeletionTimestamp != nil { if !found || !isStatusEqual(&oldStatus, &status) || pod.DeletionTimestamp != nil {
s.podStatuses[pod.UID] = status m.podStatuses[pod.UID] = status
s.podStatusChannel <- podStatusSyncRequest{pod, status} m.podStatusChannel <- podStatusSyncRequest{pod, status}
} else { } else {
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", kubeletUtil.FormatPodName(pod), status) glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", kubeletUtil.FormatPodName(pod), status)
} }
} }
// TerminatePods resets the container status for the provided pods to terminated and triggers func (m *manager) TerminatePods(pods []*api.Pod) bool {
// a status update. This function may not enqueue all the provided pods, in which case it will
// return false
func (s *statusManager) TerminatePods(pods []*api.Pod) bool {
sent := true sent := true
s.podStatusesLock.Lock() m.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock() defer m.podStatusesLock.Unlock()
for _, pod := range pods { for _, pod := range pods {
for i := range pod.Status.ContainerStatuses { for i := range pod.Status.ContainerStatuses {
pod.Status.ContainerStatuses[i].State = api.ContainerState{ pod.Status.ContainerStatuses[i].State = api.ContainerState{
@ -144,7 +167,7 @@ func (s *statusManager) TerminatePods(pods []*api.Pod) bool {
} }
} }
select { select {
case s.podStatusChannel <- podStatusSyncRequest{pod, pod.Status}: case m.podStatusChannel <- podStatusSyncRequest{pod, pod.Status}:
default: default:
sent = false sent = false
glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletUtil.FormatPodName(pod)) glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletUtil.FormatPodName(pod))
@ -153,27 +176,27 @@ func (s *statusManager) TerminatePods(pods []*api.Pod) bool {
return sent return sent
} }
func (s *statusManager) DeletePodStatus(uid types.UID) { func (m *manager) DeletePodStatus(uid types.UID) {
s.podStatusesLock.Lock() m.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock() defer m.podStatusesLock.Unlock()
delete(s.podStatuses, uid) delete(m.podStatuses, uid)
} }
// TODO(filipg): It'd be cleaner if we can do this without signal from user. // TODO(filipg): It'd be cleaner if we can do this without signal from user.
func (s *statusManager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
s.podStatusesLock.Lock() m.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock() defer m.podStatusesLock.Unlock()
for key := range s.podStatuses { for key := range m.podStatuses {
if _, ok := podUIDs[key]; !ok { if _, ok := podUIDs[key]; !ok {
glog.V(5).Infof("Removing %q from status map.", key) glog.V(5).Infof("Removing %q from status map.", key)
delete(s.podStatuses, key) delete(m.podStatuses, key)
} }
} }
} }
// syncBatch syncs pods statuses with the apiserver. // syncBatch syncs pods statuses with the apiserver.
func (s *statusManager) syncBatch() error { func (m *manager) syncBatch() error {
syncRequest := <-s.podStatusChannel syncRequest := <-m.podStatusChannel
pod := syncRequest.pod pod := syncRequest.pod
status := syncRequest.status status := syncRequest.status
@ -182,7 +205,7 @@ func (s *statusManager) syncBatch() error {
ObjectMeta: pod.ObjectMeta, ObjectMeta: pod.ObjectMeta,
} }
// TODO: make me easier to express from client code // TODO: make me easier to express from client code
statusPod, err = s.kubeClient.Pods(statusPod.Namespace).Get(statusPod.Name) statusPod, err = m.kubeClient.Pods(statusPod.Namespace).Get(statusPod.Name)
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
glog.V(3).Infof("Pod %q was deleted on the server", pod.Name) glog.V(3).Infof("Pod %q was deleted on the server", pod.Name)
return nil return nil
@ -194,7 +217,7 @@ func (s *statusManager) syncBatch() error {
} }
statusPod.Status = status statusPod.Status = status
// TODO: handle conflict as a retry, make that easier too. // TODO: handle conflict as a retry, make that easier too.
statusPod, err = s.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod) statusPod, err = m.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod)
if err == nil { if err == nil {
glog.V(3).Infof("Status for pod %q updated successfully", kubeletUtil.FormatPodName(pod)) glog.V(3).Infof("Status for pod %q updated successfully", kubeletUtil.FormatPodName(pod))
@ -205,9 +228,9 @@ func (s *statusManager) syncBatch() error {
glog.V(3).Infof("Pod %q is terminated, but some pods are still running", pod.Name) glog.V(3).Infof("Pod %q is terminated, but some pods are still running", pod.Name)
return nil return nil
} }
if err := s.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil { if err := m.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil {
glog.V(3).Infof("Pod %q fully terminated and removed from etcd", statusPod.Name) glog.V(3).Infof("Pod %q fully terminated and removed from etcd", statusPod.Name)
s.DeletePodStatus(pod.UID) m.DeletePodStatus(pod.UID)
return nil return nil
} }
} }
@ -220,7 +243,7 @@ func (s *statusManager) syncBatch() error {
// is full, and the pod worker holding the lock is waiting on this method // is full, and the pod worker holding the lock is waiting on this method
// to clear the channel. Even if this delete never runs subsequent container // to clear the channel. Even if this delete never runs subsequent container
// changes on the node should trigger updates. // changes on the node should trigger updates.
go s.DeletePodStatus(pod.UID) go m.DeletePodStatus(pod.UID)
return fmt.Errorf("error updating status for pod %q: %v", kubeletUtil.FormatPodName(pod), err) return fmt.Errorf("error updating status for pod %q: %v", kubeletUtil.FormatPodName(pod), err)
} }

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package kubelet package status
import ( import (
"fmt" "fmt"
@ -37,8 +37,8 @@ var testPod *api.Pod = &api.Pod{
}, },
} }
func newTestStatusManager() *statusManager { func newTestManager() *manager {
return newStatusManager(&testclient.Fake{}) return NewManager(&testclient.Fake{}).(*manager)
} }
func generateRandomMessage() string { func generateRandomMessage() string {
@ -66,7 +66,7 @@ func verifyActions(t *testing.T, kubeClient client.Interface, expectedActions []
} }
} }
func verifyUpdates(t *testing.T, manager *statusManager, expectedUpdates int) { func verifyUpdates(t *testing.T, manager *manager, expectedUpdates int) {
// Consume all updates in the channel. // Consume all updates in the channel.
numUpdates := 0 numUpdates := 0
for { for {
@ -89,7 +89,7 @@ func verifyUpdates(t *testing.T, manager *statusManager, expectedUpdates int) {
} }
func TestNewStatus(t *testing.T) { func TestNewStatus(t *testing.T) {
syncer := newTestStatusManager() syncer := newTestManager()
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 1) verifyUpdates(t, syncer, 1)
@ -100,7 +100,7 @@ func TestNewStatus(t *testing.T) {
} }
func TestNewStatusPreservesPodStartTime(t *testing.T) { func TestNewStatusPreservesPodStartTime(t *testing.T) {
syncer := newTestStatusManager() syncer := newTestManager()
pod := &api.Pod{ pod := &api.Pod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
UID: "12345678", UID: "12345678",
@ -121,14 +121,14 @@ func TestNewStatusPreservesPodStartTime(t *testing.T) {
} }
func TestChangedStatus(t *testing.T) { func TestChangedStatus(t *testing.T) {
syncer := newTestStatusManager() syncer := newTestManager()
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 2) verifyUpdates(t, syncer, 2)
} }
func TestChangedStatusKeepsStartTime(t *testing.T) { func TestChangedStatusKeepsStartTime(t *testing.T) {
syncer := newTestStatusManager() syncer := newTestManager()
now := util.Now() now := util.Now()
firstStatus := getRandomPodStatus() firstStatus := getRandomPodStatus()
firstStatus.StartTime = &now firstStatus.StartTime = &now
@ -145,7 +145,7 @@ func TestChangedStatusKeepsStartTime(t *testing.T) {
} }
func TestUnchangedStatus(t *testing.T) { func TestUnchangedStatus(t *testing.T) {
syncer := newTestStatusManager() syncer := newTestManager()
podStatus := getRandomPodStatus() podStatus := getRandomPodStatus()
syncer.SetPodStatus(testPod, podStatus) syncer.SetPodStatus(testPod, podStatus)
syncer.SetPodStatus(testPod, podStatus) syncer.SetPodStatus(testPod, podStatus)
@ -153,7 +153,7 @@ func TestUnchangedStatus(t *testing.T) {
} }
func TestSyncBatchIgnoresNotFound(t *testing.T) { func TestSyncBatchIgnoresNotFound(t *testing.T) {
syncer := newTestStatusManager() syncer := newTestManager()
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
err := syncer.syncBatch() err := syncer.syncBatch()
if err != nil { if err != nil {
@ -165,7 +165,7 @@ func TestSyncBatchIgnoresNotFound(t *testing.T) {
} }
func TestSyncBatch(t *testing.T) { func TestSyncBatch(t *testing.T) {
syncer := newTestStatusManager() syncer := newTestManager()
syncer.kubeClient = testclient.NewSimpleFake(testPod) syncer.kubeClient = testclient.NewSimpleFake(testPod)
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
err := syncer.syncBatch() err := syncer.syncBatch()
@ -180,7 +180,7 @@ func TestSyncBatch(t *testing.T) {
} }
func TestSyncBatchChecksMismatchedUID(t *testing.T) { func TestSyncBatchChecksMismatchedUID(t *testing.T) {
syncer := newTestStatusManager() syncer := newTestManager()
testPod.UID = "first" testPod.UID = "first"
differentPod := *testPod differentPod := *testPod
differentPod.UID = "second" differentPod.UID = "second"