mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Add node status update controller
This commit is contained in:
parent
0c1d90bf5f
commit
2f45d5706b
@ -26,6 +26,7 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/cmd/kube-controller-manager/app"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/node"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
||||
@ -140,6 +141,11 @@ func (s *CMServer) Run(_ []string) error {
|
||||
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs)
|
||||
nodeController.Run(s.NodeSyncPeriod)
|
||||
|
||||
nodeStatusUpdaterController := node.NewStatusUpdater(kubeClient, s.NodeMonitorPeriod, time.Now)
|
||||
if err := nodeStatusUpdaterController.Run(util.NeverStop); err != nil {
|
||||
glog.Fatalf("Failed to start node status update controller: %v", err)
|
||||
}
|
||||
|
||||
serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName)
|
||||
if err := serviceController.Run(s.ServiceSyncPeriod, s.NodeSyncPeriod); err != nil {
|
||||
glog.Errorf("Failed to start service controller: %v", err)
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
const (
|
||||
DefaultInfoID = "k8sm-executor"
|
||||
DefaultInfoSource = "kubernetes"
|
||||
DefaultInfoName = "Kubelet-Executor"
|
||||
DefaultSuicideTimeout = 20 * time.Minute
|
||||
DefaultLaunchGracePeriod = 5 * time.Minute
|
||||
)
|
||||
|
195
contrib/mesos/pkg/node/statusupdater.go
Normal file
195
contrib/mesos/pkg/node/statusupdater.go
Normal file
@ -0,0 +1,195 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package node
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
log "github.com/golang/glog"
|
||||
kubelet "k8s.io/kubernetes/cmd/kubelet/app"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider/providers/mesos"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
)
|
||||
|
||||
const (
|
||||
nodeStatusUpdateRetry = 5
|
||||
slaveReadyReason = "SlaveReady"
|
||||
slaveReadyMessage = "mesos reports ready status"
|
||||
)
|
||||
|
||||
type StatusUpdater struct {
|
||||
client *client.Client
|
||||
relistPeriod time.Duration
|
||||
heartBeatPeriod time.Duration
|
||||
nowFunc func() time.Time
|
||||
}
|
||||
|
||||
func NewStatusUpdater(client *client.Client, relistPeriod time.Duration, nowFunc func() time.Time) *StatusUpdater {
|
||||
kubecfg := kubelet.NewKubeletServer() // only create to get the config, this is without side-effects
|
||||
return &StatusUpdater{
|
||||
client: client,
|
||||
relistPeriod: relistPeriod,
|
||||
heartBeatPeriod: kubecfg.NodeStatusUpdateFrequency,
|
||||
nowFunc: nowFunc,
|
||||
}
|
||||
}
|
||||
|
||||
func (u *StatusUpdater) Run(terminate <-chan struct{}) error {
|
||||
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||
nodeLW := cache.NewListWatchFromClient(u.client, "nodes", api.NamespaceAll, fields.Everything())
|
||||
cache.NewReflector(nodeLW, &api.Node{}, nodeStore, u.relistPeriod).Run()
|
||||
|
||||
monitor := func() {
|
||||
// build up a slave set of nodes without kubelet
|
||||
slavesWithoutKubeletList, err := mesos.CloudProvider.ListWithoutKubelet()
|
||||
if err != nil {
|
||||
log.Errorf("Error while updating slave nodes: %v", err)
|
||||
return
|
||||
}
|
||||
slavesWithoutKubelet := make(map[string]struct{}, len(slavesWithoutKubeletList))
|
||||
for _, s := range slavesWithoutKubeletList {
|
||||
slavesWithoutKubelet[s] = struct{}{}
|
||||
}
|
||||
|
||||
// update status for nodes which do not have a kubelet running and
|
||||
// which are still existing as slave. This status update must be done
|
||||
// before the node controller counts down the NodeMonitorGracePeriod
|
||||
obj, err := nodeLW.List()
|
||||
if err != nil {
|
||||
log.Errorf("Error listing the nodes for status updates: %v", err)
|
||||
}
|
||||
nl, _ := obj.(*api.NodeList)
|
||||
nodes := nl.Items
|
||||
|
||||
for i := range nodes {
|
||||
if _, ok := slavesWithoutKubelet[nodes[i].Spec.ExternalID]; !ok {
|
||||
// let the kubelet do its job updating the status, or the
|
||||
// node controller will remove this node if the node does not even
|
||||
// exist anymore
|
||||
continue
|
||||
}
|
||||
|
||||
err := u.updateStatus(&nodes[i])
|
||||
if err != nil {
|
||||
log.Errorf("Error updating node status: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
go runtime.Until(monitor, u.heartBeatPeriod, terminate)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *StatusUpdater) updateStatus(n *api.Node) error {
|
||||
for i := 0; i < nodeStatusUpdateRetry; i++ {
|
||||
if err := u.tryUpdateStatus(n); err != nil && !errors.IsConflict(err) {
|
||||
log.Errorf("Error updating node status, will retry: %v", err)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("Update node status exceeds retry count")
|
||||
}
|
||||
|
||||
// nodeWithUpdatedStatus clones the given node and updates the NodeReady condition.
|
||||
// The updated node is return and a boolean indicating whether the node was changed
|
||||
// at all.
|
||||
func (u *StatusUpdater) nodeWithUpdatedStatus(n *api.Node) (*api.Node, bool, error) {
|
||||
readyCondition := getCondition(&n.Status, api.NodeReady)
|
||||
currentTime := unversioned.NewTime(u.nowFunc())
|
||||
|
||||
// avoid flapping by waiting at least twice the kubetlet update frequency, i.e.
|
||||
// give the kubelet the chance twice to update the heartbeat. This is necessary
|
||||
// because we only poll the Mesos master state.json once in a while and we
|
||||
// know that that the information from there can easily be outdated.
|
||||
gracePeriod := u.heartBeatPeriod * 2
|
||||
if readyCondition != nil && !currentTime.After(readyCondition.LastHeartbeatTime.Add(gracePeriod)) {
|
||||
return n, false, nil
|
||||
}
|
||||
|
||||
clone, err := api.Scheme.DeepCopy(n)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
n = clone.(*api.Node)
|
||||
|
||||
newNodeReadyCondition := api.NodeCondition{
|
||||
Type: api.NodeReady,
|
||||
Status: api.ConditionTrue,
|
||||
Reason: slaveReadyReason,
|
||||
Message: slaveReadyMessage,
|
||||
LastHeartbeatTime: currentTime,
|
||||
}
|
||||
|
||||
found := false
|
||||
for i := range n.Status.Conditions {
|
||||
c := &n.Status.Conditions[i]
|
||||
if c.Type == api.NodeReady {
|
||||
if c.Status == newNodeReadyCondition.Status {
|
||||
newNodeReadyCondition.LastTransitionTime = c.LastTransitionTime
|
||||
} else {
|
||||
newNodeReadyCondition.LastTransitionTime = currentTime
|
||||
}
|
||||
n.Status.Conditions[i] = newNodeReadyCondition
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
newNodeReadyCondition.LastTransitionTime = currentTime
|
||||
n.Status.Conditions = append(n.Status.Conditions, newNodeReadyCondition)
|
||||
}
|
||||
|
||||
return n, true, nil
|
||||
}
|
||||
|
||||
// tryUpdateStatus updates the status of the given node and tries to persist that
|
||||
// on the apiserver
|
||||
func (u *StatusUpdater) tryUpdateStatus(n *api.Node) error {
|
||||
n, updated, err := u.nodeWithUpdatedStatus(n)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !updated {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = u.client.Nodes().UpdateStatus(n)
|
||||
return err
|
||||
}
|
||||
|
||||
// getCondition returns a condition object for the specific condition
|
||||
// type, nil if the condition is not set.
|
||||
func getCondition(status *api.NodeStatus, conditionType api.NodeConditionType) *api.NodeCondition {
|
||||
if status == nil {
|
||||
return nil
|
||||
}
|
||||
for i := range status.Conditions {
|
||||
if status.Conditions[i].Type == conditionType {
|
||||
return &status.Conditions[i]
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
77
contrib/mesos/pkg/node/statusupdater_test.go
Normal file
77
contrib/mesos/pkg/node/statusupdater_test.go
Normal file
@ -0,0 +1,77 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package node
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/kubernetes/cmd/kube-controller-manager/app"
|
||||
kubelet "k8s.io/kubernetes/cmd/kubelet/app"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
)
|
||||
|
||||
func Test_nodeWithUpdatedStatus(t *testing.T) {
|
||||
now := time.Now()
|
||||
testNode := func(d time.Duration, s api.ConditionStatus, r string) *api.Node {
|
||||
return &api.Node{
|
||||
Status: api.NodeStatus{
|
||||
Conditions: []api.NodeCondition{{
|
||||
Type: api.NodeOutOfDisk,
|
||||
}, {
|
||||
Type: api.NodeReady,
|
||||
Status: s,
|
||||
Reason: r,
|
||||
Message: "some message we don't care about here",
|
||||
LastTransitionTime: unversioned.Time{now.Add(-time.Minute)},
|
||||
LastHeartbeatTime: unversioned.Time{now.Add(d)},
|
||||
}},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
cm := app.NewCMServer()
|
||||
kubecfg := kubelet.NewKubeletServer()
|
||||
assert.True(t, kubecfg.NodeStatusUpdateFrequency*3 < cm.NodeMonitorGracePeriod) // sanity check for defaults
|
||||
|
||||
n := testNode(0, api.ConditionTrue, "KubeletReady")
|
||||
su := NewStatusUpdater(nil, cm.NodeMonitorPeriod, func() time.Time { return now })
|
||||
_, updated, err := su.nodeWithUpdatedStatus(n)
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, updated, "no update expected b/c kubelet updated heartbeat just now")
|
||||
|
||||
n = testNode(-cm.NodeMonitorGracePeriod, api.ConditionTrue, "KubeletReady")
|
||||
n2, updated, err := su.nodeWithUpdatedStatus(n)
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, updated, "update expected b/c kubelet's update is older than DefaultNodeMonitorGracePeriod")
|
||||
assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Reason, slaveReadyReason)
|
||||
assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Message, slaveReadyMessage)
|
||||
|
||||
n = testNode(-kubecfg.NodeStatusUpdateFrequency, api.ConditionTrue, "KubeletReady")
|
||||
n2, updated, err = su.nodeWithUpdatedStatus(n)
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, updated, "no update expected b/c kubelet's update was missed only once")
|
||||
|
||||
n = testNode(-kubecfg.NodeStatusUpdateFrequency*3, api.ConditionTrue, "KubeletReady")
|
||||
n2, updated, err = su.nodeWithUpdatedStatus(n)
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, updated, "update expected b/c kubelet's update is older than 3*DefaultNodeStatusUpdateFrequency")
|
||||
assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Reason, slaveReadyReason)
|
||||
assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Message, slaveReadyMessage)
|
||||
}
|
@ -70,6 +70,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
clientauth "k8s.io/kubernetes/pkg/client/unversioned/auth"
|
||||
cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/mesos"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/healthz"
|
||||
"k8s.io/kubernetes/pkg/master/ports"
|
||||
@ -426,7 +427,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E
|
||||
// Create mesos scheduler driver.
|
||||
execInfo := &mesos.ExecutorInfo{
|
||||
Command: ci,
|
||||
Name: proto.String(execcfg.DefaultInfoName),
|
||||
Name: proto.String(cloud.KubernetesExecutorName),
|
||||
Source: proto.String(execcfg.DefaultInfoSource),
|
||||
}
|
||||
|
||||
|
@ -249,7 +249,7 @@ func parseMesosState(blob []byte) (*mesosState, error) {
|
||||
// k8s instance in a cluster. At the moment this is not possible for
|
||||
// a number of reasons.
|
||||
// TODO(sttts): find way to detect executors of this k8s instance
|
||||
if e.Name == "Kubelet-Executor" {
|
||||
if e.Name == KubernetesExecutorName {
|
||||
executorSlaveIds[e.SlaveId] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user