From 2f45d5706b15ccc0b5681128170fa0e1ff92b55f Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Thu, 22 Oct 2015 14:48:49 -0700 Subject: [PATCH] Add node status update controller --- .../controllermanager/controllermanager.go | 6 + contrib/mesos/pkg/executor/config/config.go | 1 - contrib/mesos/pkg/node/statusupdater.go | 195 ++++++++++++++++++ contrib/mesos/pkg/node/statusupdater_test.go | 77 +++++++ .../mesos/pkg/scheduler/service/service.go | 3 +- pkg/cloudprovider/providers/mesos/client.go | 2 +- 6 files changed, 281 insertions(+), 3 deletions(-) create mode 100644 contrib/mesos/pkg/node/statusupdater.go create mode 100644 contrib/mesos/pkg/node/statusupdater_test.go diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 055d6876e84..6c3e6a39189 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -26,6 +26,7 @@ import ( "time" "k8s.io/kubernetes/cmd/kube-controller-manager/app" + "k8s.io/kubernetes/contrib/mesos/pkg/node" "k8s.io/kubernetes/pkg/api/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" @@ -140,6 +141,11 @@ func (s *CMServer) Run(_ []string) error { s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod) + nodeStatusUpdaterController := node.NewStatusUpdater(kubeClient, s.NodeMonitorPeriod, time.Now) + if err := nodeStatusUpdaterController.Run(util.NeverStop); err != nil { + glog.Fatalf("Failed to start node status update controller: %v", err) + } + serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName) if err := serviceController.Run(s.ServiceSyncPeriod, s.NodeSyncPeriod); err != nil { glog.Errorf("Failed to start service controller: %v", err) diff --git a/contrib/mesos/pkg/executor/config/config.go b/contrib/mesos/pkg/executor/config/config.go index e61ce500f26..55af9fe97fb 100644 --- a/contrib/mesos/pkg/executor/config/config.go +++ b/contrib/mesos/pkg/executor/config/config.go @@ -24,7 +24,6 @@ import ( const ( DefaultInfoID = "k8sm-executor" DefaultInfoSource = "kubernetes" - DefaultInfoName = "Kubelet-Executor" DefaultSuicideTimeout = 20 * time.Minute DefaultLaunchGracePeriod = 5 * time.Minute ) diff --git a/contrib/mesos/pkg/node/statusupdater.go b/contrib/mesos/pkg/node/statusupdater.go new file mode 100644 index 00000000000..c9d8172473c --- /dev/null +++ b/contrib/mesos/pkg/node/statusupdater.go @@ -0,0 +1,195 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "fmt" + "time" + + log "github.com/golang/glog" + kubelet "k8s.io/kubernetes/cmd/kubelet/app" + "k8s.io/kubernetes/contrib/mesos/pkg/runtime" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/cache" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/cloudprovider/providers/mesos" + "k8s.io/kubernetes/pkg/fields" +) + +const ( + nodeStatusUpdateRetry = 5 + slaveReadyReason = "SlaveReady" + slaveReadyMessage = "mesos reports ready status" +) + +type StatusUpdater struct { + client *client.Client + relistPeriod time.Duration + heartBeatPeriod time.Duration + nowFunc func() time.Time +} + +func NewStatusUpdater(client *client.Client, relistPeriod time.Duration, nowFunc func() time.Time) *StatusUpdater { + kubecfg := kubelet.NewKubeletServer() // only create to get the config, this is without side-effects + return &StatusUpdater{ + client: client, + relistPeriod: relistPeriod, + heartBeatPeriod: kubecfg.NodeStatusUpdateFrequency, + nowFunc: nowFunc, + } +} + +func (u *StatusUpdater) Run(terminate <-chan struct{}) error { + nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + nodeLW := cache.NewListWatchFromClient(u.client, "nodes", api.NamespaceAll, fields.Everything()) + cache.NewReflector(nodeLW, &api.Node{}, nodeStore, u.relistPeriod).Run() + + monitor := func() { + // build up a slave set of nodes without kubelet + slavesWithoutKubeletList, err := mesos.CloudProvider.ListWithoutKubelet() + if err != nil { + log.Errorf("Error while updating slave nodes: %v", err) + return + } + slavesWithoutKubelet := make(map[string]struct{}, len(slavesWithoutKubeletList)) + for _, s := range slavesWithoutKubeletList { + slavesWithoutKubelet[s] = struct{}{} + } + + // update status for nodes which do not have a kubelet running and + // which are still existing as slave. This status update must be done + // before the node controller counts down the NodeMonitorGracePeriod + obj, err := nodeLW.List() + if err != nil { + log.Errorf("Error listing the nodes for status updates: %v", err) + } + nl, _ := obj.(*api.NodeList) + nodes := nl.Items + + for i := range nodes { + if _, ok := slavesWithoutKubelet[nodes[i].Spec.ExternalID]; !ok { + // let the kubelet do its job updating the status, or the + // node controller will remove this node if the node does not even + // exist anymore + continue + } + + err := u.updateStatus(&nodes[i]) + if err != nil { + log.Errorf("Error updating node status: %v", err) + } + } + } + go runtime.Until(monitor, u.heartBeatPeriod, terminate) + + return nil +} + +func (u *StatusUpdater) updateStatus(n *api.Node) error { + for i := 0; i < nodeStatusUpdateRetry; i++ { + if err := u.tryUpdateStatus(n); err != nil && !errors.IsConflict(err) { + log.Errorf("Error updating node status, will retry: %v", err) + } else { + return nil + } + } + return fmt.Errorf("Update node status exceeds retry count") +} + +// nodeWithUpdatedStatus clones the given node and updates the NodeReady condition. +// The updated node is return and a boolean indicating whether the node was changed +// at all. +func (u *StatusUpdater) nodeWithUpdatedStatus(n *api.Node) (*api.Node, bool, error) { + readyCondition := getCondition(&n.Status, api.NodeReady) + currentTime := unversioned.NewTime(u.nowFunc()) + + // avoid flapping by waiting at least twice the kubetlet update frequency, i.e. + // give the kubelet the chance twice to update the heartbeat. This is necessary + // because we only poll the Mesos master state.json once in a while and we + // know that that the information from there can easily be outdated. + gracePeriod := u.heartBeatPeriod * 2 + if readyCondition != nil && !currentTime.After(readyCondition.LastHeartbeatTime.Add(gracePeriod)) { + return n, false, nil + } + + clone, err := api.Scheme.DeepCopy(n) + if err != nil { + return nil, false, err + } + n = clone.(*api.Node) + + newNodeReadyCondition := api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: slaveReadyReason, + Message: slaveReadyMessage, + LastHeartbeatTime: currentTime, + } + + found := false + for i := range n.Status.Conditions { + c := &n.Status.Conditions[i] + if c.Type == api.NodeReady { + if c.Status == newNodeReadyCondition.Status { + newNodeReadyCondition.LastTransitionTime = c.LastTransitionTime + } else { + newNodeReadyCondition.LastTransitionTime = currentTime + } + n.Status.Conditions[i] = newNodeReadyCondition + found = true + break + } + } + + if !found { + newNodeReadyCondition.LastTransitionTime = currentTime + n.Status.Conditions = append(n.Status.Conditions, newNodeReadyCondition) + } + + return n, true, nil +} + +// tryUpdateStatus updates the status of the given node and tries to persist that +// on the apiserver +func (u *StatusUpdater) tryUpdateStatus(n *api.Node) error { + n, updated, err := u.nodeWithUpdatedStatus(n) + if err != nil { + return err + } + if !updated { + return nil + } + + _, err = u.client.Nodes().UpdateStatus(n) + return err +} + +// getCondition returns a condition object for the specific condition +// type, nil if the condition is not set. +func getCondition(status *api.NodeStatus, conditionType api.NodeConditionType) *api.NodeCondition { + if status == nil { + return nil + } + for i := range status.Conditions { + if status.Conditions[i].Type == conditionType { + return &status.Conditions[i] + } + } + return nil +} diff --git a/contrib/mesos/pkg/node/statusupdater_test.go b/contrib/mesos/pkg/node/statusupdater_test.go new file mode 100644 index 00000000000..ea3b831802e --- /dev/null +++ b/contrib/mesos/pkg/node/statusupdater_test.go @@ -0,0 +1,77 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/cmd/kube-controller-manager/app" + kubelet "k8s.io/kubernetes/cmd/kubelet/app" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" +) + +func Test_nodeWithUpdatedStatus(t *testing.T) { + now := time.Now() + testNode := func(d time.Duration, s api.ConditionStatus, r string) *api.Node { + return &api.Node{ + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{{ + Type: api.NodeOutOfDisk, + }, { + Type: api.NodeReady, + Status: s, + Reason: r, + Message: "some message we don't care about here", + LastTransitionTime: unversioned.Time{now.Add(-time.Minute)}, + LastHeartbeatTime: unversioned.Time{now.Add(d)}, + }}, + }, + } + } + + cm := app.NewCMServer() + kubecfg := kubelet.NewKubeletServer() + assert.True(t, kubecfg.NodeStatusUpdateFrequency*3 < cm.NodeMonitorGracePeriod) // sanity check for defaults + + n := testNode(0, api.ConditionTrue, "KubeletReady") + su := NewStatusUpdater(nil, cm.NodeMonitorPeriod, func() time.Time { return now }) + _, updated, err := su.nodeWithUpdatedStatus(n) + assert.NoError(t, err) + assert.False(t, updated, "no update expected b/c kubelet updated heartbeat just now") + + n = testNode(-cm.NodeMonitorGracePeriod, api.ConditionTrue, "KubeletReady") + n2, updated, err := su.nodeWithUpdatedStatus(n) + assert.NoError(t, err) + assert.True(t, updated, "update expected b/c kubelet's update is older than DefaultNodeMonitorGracePeriod") + assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Reason, slaveReadyReason) + assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Message, slaveReadyMessage) + + n = testNode(-kubecfg.NodeStatusUpdateFrequency, api.ConditionTrue, "KubeletReady") + n2, updated, err = su.nodeWithUpdatedStatus(n) + assert.NoError(t, err) + assert.False(t, updated, "no update expected b/c kubelet's update was missed only once") + + n = testNode(-kubecfg.NodeStatusUpdateFrequency*3, api.ConditionTrue, "KubeletReady") + n2, updated, err = su.nodeWithUpdatedStatus(n) + assert.NoError(t, err) + assert.True(t, updated, "update expected b/c kubelet's update is older than 3*DefaultNodeStatusUpdateFrequency") + assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Reason, slaveReadyReason) + assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Message, slaveReadyMessage) +} diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 244f7e6a91e..6e08417dabd 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -70,6 +70,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" clientauth "k8s.io/kubernetes/pkg/client/unversioned/auth" + cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/mesos" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/master/ports" @@ -426,7 +427,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E // Create mesos scheduler driver. execInfo := &mesos.ExecutorInfo{ Command: ci, - Name: proto.String(execcfg.DefaultInfoName), + Name: proto.String(cloud.KubernetesExecutorName), Source: proto.String(execcfg.DefaultInfoSource), } diff --git a/pkg/cloudprovider/providers/mesos/client.go b/pkg/cloudprovider/providers/mesos/client.go index 70d8ada92c3..26426ef5568 100644 --- a/pkg/cloudprovider/providers/mesos/client.go +++ b/pkg/cloudprovider/providers/mesos/client.go @@ -249,7 +249,7 @@ func parseMesosState(blob []byte) (*mesosState, error) { // k8s instance in a cluster. At the moment this is not possible for // a number of reasons. // TODO(sttts): find way to detect executors of this k8s instance - if e.Name == "Kubelet-Executor" { + if e.Name == KubernetesExecutorName { executorSlaveIds[e.SlaveId] = struct{}{} } }