Move NodeController constants to flags

This commit is contained in:
gmarek 2015-03-31 13:17:12 +02:00
parent d6851729d2
commit 321a81047c
7 changed files with 118 additions and 68 deletions

View File

@ -223,8 +223,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
}} }}
nodeController := nodeControllerPkg.NewNodeController( nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute, util.NewFakeRateLimiter(), 8*time.Second, 30*time.Second, 5*time.Second)
nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute, util.NewFakeRateLimiter())
nodeController.Run(5*time.Second, true, false) nodeController.Run(5*time.Second, true, false)
cadvisorInterface := new(cadvisor.Fake) cadvisorInterface := new(cadvisor.Fake)

View File

@ -57,6 +57,10 @@ type CMServer struct {
MachineList util.StringList MachineList util.StringList
SyncNodeList bool SyncNodeList bool
SyncNodeStatus bool SyncNodeStatus bool
NodeMonitorGracePeriod time.Duration
NodeStartupGracePeriod time.Duration
NodeMonitorPeriod time.Duration
NodeStatusUpdateRetry int
PodEvictionTimeout time.Duration PodEvictionTimeout time.Duration
DeletingPodsQps float32 DeletingPodsQps float32
DeletingPodsBurst int DeletingPodsBurst int
@ -115,6 +119,10 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, ""+ fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, ""+
"If true, node controller sends probes to kubelet and updates NodeStatus."+ "If true, node controller sends probes to kubelet and updates NodeStatus."+
"If false, Kubelet posts NodeStatus to API server.") "If false, Kubelet posts NodeStatus to API server.")
fs.DurationVar(&s.NodeMonitorGracePeriod, "node_monitor_grace_period", 8*time.Second, "Amount of time which we allow running Node to be unresponsive before marking it unhealty."+
"Must be N times more than kubelet's nodeStatusUpdateFrequency, where N means number of retries allowed for kubelet to post node status.")
fs.DurationVar(&s.NodeStartupGracePeriod, "node_startup_grace_period", 30*time.Second, "Amount of time which we allow starting Node to be unresponsive before marking it unhealty.")
fs.DurationVar(&s.NodeMonitorPeriod, "node_monitor_period", 5*time.Second, "The period for syncing NodeStatus in NodeController.")
// TODO: Discover these by pinging the host machines, and rip out these flags. // TODO: Discover these by pinging the host machines, and rip out these flags.
// TODO: in the meantime, use resource.QuantityFlag() instead of these // TODO: in the meantime, use resource.QuantityFlag() instead of these
fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node") fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node")
@ -181,7 +189,8 @@ func (s *CMServer) Run(_ []string) error {
} }
nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources,
kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)) kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod)
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList, s.SyncNodeStatus) nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList, s.SyncNodeStatus)
resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient) resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)

View File

@ -94,6 +94,7 @@ type KubeletServer struct {
TLSCertFile string TLSCertFile string
TLSPrivateKeyFile string TLSPrivateKeyFile string
CertDirectory string CertDirectory string
NodeStatusUpdateFrequency time.Duration
} }
// bootstrapping interface for kubelet, targets the initialization protocol // bootstrapping interface for kubelet, targets the initialization protocol
@ -137,6 +138,7 @@ func NewKubeletServer() *KubeletServer {
NetworkPluginName: "", NetworkPluginName: "",
HostNetworkSources: kubelet.FileSource, HostNetworkSources: kubelet.FileSource,
CertDirectory: "/var/run/kubernetes", CertDirectory: "/var/run/kubernetes",
NodeStatusUpdateFrequency: 2 * time.Second,
} }
} }
@ -181,6 +183,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.Var(&s.ClusterDNS, "cluster_dns", "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers") fs.Var(&s.ClusterDNS, "cluster_dns", "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers")
fs.BoolVar(&s.ReallyCrashForTesting, "really_crash_for_testing", s.ReallyCrashForTesting, "If true, crash with panics more often.") fs.BoolVar(&s.ReallyCrashForTesting, "really_crash_for_testing", s.ReallyCrashForTesting, "If true, crash with panics more often.")
fs.DurationVar(&s.StreamingConnectionIdleTimeout, "streaming_connection_idle_timeout", 0, "Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m'") fs.DurationVar(&s.StreamingConnectionIdleTimeout, "streaming_connection_idle_timeout", 0, "Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m'")
fs.DurationVar(&s.NodeStatusUpdateFrequency, "node_status_update_frequency", s.NodeStatusUpdateFrequency, "Specifies how often kubelet posts node status to master. Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod in nodecontroller. Default: 2s")
fs.IntVar(&s.ImageGCHighThresholdPercent, "image_gc_high_threshold", s.ImageGCHighThresholdPercent, "The percent of disk usage after which image garbage collection is always run. Default: 90%%") fs.IntVar(&s.ImageGCHighThresholdPercent, "image_gc_high_threshold", s.ImageGCHighThresholdPercent, "The percent of disk usage after which image garbage collection is always run. Default: 90%%")
fs.IntVar(&s.ImageGCLowThresholdPercent, "image_gc_low_threshold", s.ImageGCLowThresholdPercent, "The percent of disk usage before which image garbage collection is never run. Lowest disk usage to garbage collect to. Default: 80%%") fs.IntVar(&s.ImageGCLowThresholdPercent, "image_gc_low_threshold", s.ImageGCLowThresholdPercent, "The percent of disk usage before which image garbage collection is never run. Lowest disk usage to garbage collect to. Default: 80%%")
fs.StringVar(&s.NetworkPluginName, "network_plugin", s.NetworkPluginName, "<Warning: Alpha feature> The name of the network plugin to be invoked for various events in kubelet/pod lifecycle") fs.StringVar(&s.NetworkPluginName, "network_plugin", s.NetworkPluginName, "<Warning: Alpha feature> The name of the network plugin to be invoked for various events in kubelet/pod lifecycle")
@ -381,6 +384,7 @@ func SimpleKubelet(client *client.Client,
ConfigFile: configFilePath, ConfigFile: configFilePath,
ImageGCPolicy: imageGCPolicy, ImageGCPolicy: imageGCPolicy,
Cloud: cloud, Cloud: cloud,
NodeStatusUpdateFrequency: 2 * time.Second,
} }
return &kcfg return &kcfg
} }
@ -501,6 +505,7 @@ type KubeletConfig struct {
TLSOptions *kubelet.TLSOptions TLSOptions *kubelet.TLSOptions
ImageGCPolicy kubelet.ImageGCPolicy ImageGCPolicy kubelet.ImageGCPolicy
Cloud cloudprovider.Interface Cloud cloudprovider.Interface
NodeStatusUpdateFrequency time.Duration
} }
func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) { func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) {
@ -542,7 +547,8 @@ func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
kc.Recorder, kc.Recorder,
kc.CadvisorInterface, kc.CadvisorInterface,
kc.ImageGCPolicy, kc.ImageGCPolicy,
kc.Cloud) kc.Cloud,
kc.NodeStatusUpdateFrequency)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err

View File

@ -131,7 +131,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort} kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort}
nodeController := nodeControllerPkg.NewNodeController( nodeController := nodeControllerPkg.NewNodeController(
nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst)) nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 8*time.Second, 30*time.Second, 5*time.Second)
nodeController.Run(10*time.Second, true, true) nodeController.Run(10*time.Second, true, true)
endpoints := service.NewEndpointController(cl) endpoints := service.NewEndpointController(cl)

View File

@ -34,40 +34,17 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
const (
// The constant is used if sync_nodes_status=False. NodeController will not proactively
// sync node status in this case, but will monitor node status updated from kubelet. If
// it doesn't receive update for this amount of time, it will start posting "NodeReady==
// ConditionUnknown". The amount of time before which NodeController start evicting pods
// is controlled via flag 'pod_eviction_timeout'.
// Note: be cautious when changing the constant, it must work with nodeStatusUpdateFrequency
// in kubelet. There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
// N means number of retries allowed for kubelet to post node status. It is pointless
// to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
// will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
// The constant must be less than podEvictionTimeout.
// 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes
// longer for user to see up-to-date node status.
nodeMonitorGracePeriod = 8 * time.Second
// The constant is used if sync_nodes_status=False, only for node startup. When node
// is just created, e.g. cluster bootstrap or node creation, we give a longer grace period.
nodeStartupGracePeriod = 30 * time.Second
// The constant is used if sync_nodes_status=False. It controls NodeController monitoring
// period, i.e. how often does NodeController check node status posted from kubelet.
// Theoretically, this value should be lower than nodeMonitorGracePeriod.
// TODO: Change node status monitor to watch based.
nodeMonitorPeriod = 5 * time.Second
// Constant controlling number of retries of writing NodeStatus update.
nodeStatusUpdateRetry = 5
)
var ( var (
ErrRegistration = errors.New("unable to register all nodes.") ErrRegistration = errors.New("unable to register all nodes.")
ErrQueryIPAddress = errors.New("unable to query IP address.") ErrQueryIPAddress = errors.New("unable to query IP address.")
ErrCloudInstance = errors.New("cloud provider doesn't support instances.") ErrCloudInstance = errors.New("cloud provider doesn't support instances.")
) )
const (
// Constant controlling number of retries of writing NodeStatus update.
nodeStatusUpdateRetry = 5
)
type NodeStatusData struct { type NodeStatusData struct {
probeTimestamp util.Time probeTimestamp util.Time
readyTransitionTimestamp util.Time readyTransitionTimestamp util.Time
@ -88,6 +65,28 @@ type NodeController struct {
// This timestamp is to be used instead of LastProbeTime stored in Condition. We do this // This timestamp is to be used instead of LastProbeTime stored in Condition. We do this
// to aviod the problem with time skew across the cluster. // to aviod the problem with time skew across the cluster.
nodeStatusMap map[string]NodeStatusData nodeStatusMap map[string]NodeStatusData
// Value used if sync_nodes_status=False. NodeController will not proactively
// sync node status in this case, but will monitor node status updated from kubelet. If
// it doesn't receive update for this amount of time, it will start posting "NodeReady==
// ConditionUnknown". The amount of time before which NodeController start evicting pods
// is controlled via flag 'pod_eviction_timeout'.
// Note: be cautious when changing the constant, it must work with nodeStatusUpdateFrequency
// in kubelet. There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
// N means number of retries allowed for kubelet to post node status. It is pointless
// to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
// will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
// The constant must be less than podEvictionTimeout.
// 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes
// longer for user to see up-to-date node status.
nodeMonitorGracePeriod time.Duration
// Value used if sync_nodes_status=False, only for node startup. When node
// is just created, e.g. cluster bootstrap or node creation, we give a longer grace period.
nodeStartupGracePeriod time.Duration
// Value controlling NodeController monitoring period, i.e. how often does NodeController
// check node status posted from kubelet. Theoretically, this value should be lower than nodeMonitorGracePeriod.
// TODO: Change node status monitor to watch based.
nodeMonitorPeriod time.Duration
// Method for easy mocking in unittest. // Method for easy mocking in unittest.
lookupIP func(host string) ([]net.IP, error) lookupIP func(host string) ([]net.IP, error)
now func() util.Time now func() util.Time
@ -103,7 +102,10 @@ func NewNodeController(
kubeletClient client.KubeletClient, kubeletClient client.KubeletClient,
registerRetryCount int, registerRetryCount int,
podEvictionTimeout time.Duration, podEvictionTimeout time.Duration,
deletingPodsRateLimiter util.RateLimiter) *NodeController { deletingPodsRateLimiter util.RateLimiter,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration) *NodeController {
return &NodeController{ return &NodeController{
cloud: cloud, cloud: cloud,
matchRE: matchRE, matchRE: matchRE,
@ -115,6 +117,9 @@ func NewNodeController(
podEvictionTimeout: podEvictionTimeout, podEvictionTimeout: podEvictionTimeout,
deletingPodsRateLimiter: deletingPodsRateLimiter, deletingPodsRateLimiter: deletingPodsRateLimiter,
nodeStatusMap: make(map[string]NodeStatusData), nodeStatusMap: make(map[string]NodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
lookupIP: net.LookupIP, lookupIP: net.LookupIP,
now: util.Now, now: util.Now,
} }
@ -177,7 +182,7 @@ func (nc *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus
if err := nc.MonitorNodeStatus(); err != nil { if err := nc.MonitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err) glog.Errorf("Error monitoring node status: %v", err)
} }
}, nodeMonitorPeriod) }, nc.nodeMonitorPeriod)
} }
} }
@ -463,7 +468,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
LastProbeTime: node.CreationTimestamp, LastProbeTime: node.CreationTimestamp,
LastTransitionTime: node.CreationTimestamp, LastTransitionTime: node.CreationTimestamp,
} }
gracePeriod = nodeStartupGracePeriod gracePeriod = nc.nodeStartupGracePeriod
nc.nodeStatusMap[node.Name] = NodeStatusData{ nc.nodeStatusMap[node.Name] = NodeStatusData{
status: node.Status, status: node.Status,
probeTimestamp: node.CreationTimestamp, probeTimestamp: node.CreationTimestamp,
@ -472,7 +477,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
} else { } else {
// If ready condition is not nil, make a copy of it, since we may modify it in place later. // If ready condition is not nil, make a copy of it, since we may modify it in place later.
lastReadyCondition = *readyCondition lastReadyCondition = *readyCondition
gracePeriod = nodeMonitorGracePeriod gracePeriod = nc.nodeMonitorGracePeriod
} }
savedNodeStatus, found := nc.nodeStatusMap[node.Name] savedNodeStatus, found := nc.nodeStatusMap[node.Name]
@ -615,6 +620,7 @@ func (nc *NodeController) MonitorNodeStatus() error {
nc.now().After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) { nc.now().After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
// Node stays in not ready for at least 'podEvictionTimeout' - evict all pods on the unhealthy node. // Node stays in not ready for at least 'podEvictionTimeout' - evict all pods on the unhealthy node.
// Makes sure we are not removing pods from to many nodes in the same time. // Makes sure we are not removing pods from to many nodes in the same time.
glog.Infof("Evicting pods: %v is later than %v + %v", nc.now(), nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
if nc.deletingPodsRateLimiter.CanAccept() { if nc.deletingPodsRateLimiter.CanAccept() {
nc.deletePods(node.Name) nc.deletePods(node.Name)
} }
@ -623,6 +629,7 @@ func (nc *NodeController) MonitorNodeStatus() error {
nc.now().After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout-gracePeriod)) { nc.now().After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout-gracePeriod)) {
// Same as above. Note however, since condition unknown is posted by node controller, which means we // Same as above. Note however, since condition unknown is posted by node controller, which means we
// need to substract monitoring grace period in order to get the real 'podEvictionTimeout'. // need to substract monitoring grace period in order to get the real 'podEvictionTimeout'.
glog.Infof("Evicting pods2: %v is later than %v + %v", nc.now(), nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
if nc.deletingPodsRateLimiter.CanAccept() { if nc.deletingPodsRateLimiter.CanAccept() {
nc.deletePods(node.Name) nc.deletePods(node.Name)
} }

View File

@ -38,6 +38,12 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
) )
const (
testNodeMonitorGracePeriod = 8 * time.Second
testNodeStartupGracePeriod = 30 * time.Second
testNodeMonitorPeriod = 5 * time.Second
)
// FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It // FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It
// allows test cases to have fine-grained control over mock behaviors. We also need // allows test cases to have fine-grained control over mock behaviors. We also need
// PodsInterface and PodInterface to test list & delet pods, which is implemented in // PodsInterface and PodInterface to test list & delet pods, which is implemented in
@ -247,7 +253,8 @@ func TestRegisterNodes(t *testing.T) {
for _, machine := range item.machines { for _, machine := range item.machines {
nodes.Items = append(nodes.Items, *newNode(machine)) nodes.Items = append(nodes.Items, *newNode(machine))
} }
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute, util.NewFakeRateLimiter()) nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond) err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond)
if !item.expectedFail && err != nil { if !item.expectedFail && err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -332,7 +339,8 @@ func TestCreateGetStaticNodesWithSpec(t *testing.T) {
}, },
} }
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(nil, "", item.machines, &resources, nil, nil, 10, time.Minute, util.NewFakeRateLimiter()) nodeController := NewNodeController(nil, "", item.machines, &resources, nil, nil, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
nodes, err := nodeController.GetStaticNodesWithSpec() nodes, err := nodeController.GetStaticNodesWithSpec()
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -393,7 +401,8 @@ func TestCreateGetCloudNodesWithSpec(t *testing.T) {
} }
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, 10, time.Minute, util.NewFakeRateLimiter()) nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
nodes, err := nodeController.GetCloudNodesWithSpec() nodes, err := nodeController.GetCloudNodesWithSpec()
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -499,7 +508,8 @@ func TestSyncCloudNodes(t *testing.T) {
} }
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute, util.NewFakeRateLimiter()) nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
if err := nodeController.SyncCloudNodes(); err != nil { if err := nodeController.SyncCloudNodes(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -585,7 +595,8 @@ func TestSyncCloudNodesEvictPods(t *testing.T) {
} }
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute, util.NewFakeRateLimiter()) nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
if err := nodeController.SyncCloudNodes(); err != nil { if err := nodeController.SyncCloudNodes(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -687,7 +698,8 @@ func TestNodeConditionsCheck(t *testing.T) {
} }
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute, util.NewFakeRateLimiter()) nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
nodeController.now = func() util.Time { return fakeNow } nodeController.now = func() util.Time { return fakeNow }
conditions := nodeController.DoCheck(item.node) conditions := nodeController.DoCheck(item.node)
if !reflect.DeepEqual(item.expectedConditions, conditions) { if !reflect.DeepEqual(item.expectedConditions, conditions) {
@ -718,7 +730,8 @@ func TestPopulateNodeAddresses(t *testing.T) {
} }
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, 10, time.Minute, util.NewFakeRateLimiter()) nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
result, err := nodeController.PopulateAddresses(item.nodes) result, err := nodeController.PopulateAddresses(item.nodes)
// In case of IP querying error, we should continue. // In case of IP querying error, we should continue.
if err != nil { if err != nil {
@ -821,7 +834,8 @@ func TestSyncProbedNodeStatus(t *testing.T) {
} }
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute, util.NewFakeRateLimiter()) nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
nodeController.now = func() util.Time { return fakeNow } nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.SyncProbedNodeStatus(); err != nil { if err := nodeController.SyncProbedNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -924,7 +938,8 @@ func TestSyncProbedNodeStatusTransitionTime(t *testing.T) {
} }
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute, util.NewFakeRateLimiter()) nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) } nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) }
nodeController.now = func() util.Time { return fakeNow } nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.SyncProbedNodeStatus(); err != nil { if err := nodeController.SyncProbedNodeStatus(); err != nil {
@ -1077,7 +1092,8 @@ func TestSyncProbedNodeStatusEvictPods(t *testing.T) {
} }
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute, util.NewFakeRateLimiter()) nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) } nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) }
if err := nodeController.SyncProbedNodeStatus(); err != nil { if err := nodeController.SyncProbedNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -1093,13 +1109,14 @@ func TestSyncProbedNodeStatusEvictPods(t *testing.T) {
func TestMonitorNodeStatusEvictPods(t *testing.T) { func TestMonitorNodeStatusEvictPods(t *testing.T) {
fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
evictionTimeout := 30 * time.Minute evictionTimeout := 10 * time.Minute
table := []struct { table := []struct {
fakeNodeHandler *FakeNodeHandler fakeNodeHandler *FakeNodeHandler
timeToPass time.Duration timeToPass time.Duration
newNodeStatus api.NodeStatus newNodeStatus api.NodeStatus
expectedEvictPods bool expectedEvictPods bool
description string
}{ }{
// Node created recently, with no status (happens only at cluster startup). // Node created recently, with no status (happens only at cluster startup).
{ {
@ -1119,6 +1136,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
timeToPass: 0, timeToPass: 0,
newNodeStatus: api.NodeStatus{}, newNodeStatus: api.NodeStatus{},
expectedEvictPods: false, expectedEvictPods: false,
description: "Node created recently, with no status.",
}, },
// Node created long time ago, and kubelet posted NotReady for a short period of time. // Node created long time ago, and kubelet posted NotReady for a short period of time.
{ {
@ -1145,7 +1163,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
}, },
}, },
timeToPass: 10 * time.Minute, timeToPass: evictionTimeout,
newNodeStatus: api.NodeStatus{ newNodeStatus: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
{ {
@ -1158,6 +1176,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}, },
}, },
expectedEvictPods: false, expectedEvictPods: false,
description: "Node created long time ago, and kubelet posted NotReady for a short period of time.",
}, },
// Node created long time ago, and kubelet posted NotReady for a long period of time. // Node created long time ago, and kubelet posted NotReady for a long period of time.
{ {
@ -1197,6 +1216,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}, },
}, },
expectedEvictPods: true, expectedEvictPods: true,
description: "Node created long time ago, and kubelet posted NotReady for a long period of time.",
}, },
// Node created long time ago, node controller posted Unknown for a short period of time. // Node created long time ago, node controller posted Unknown for a short period of time.
{ {
@ -1223,7 +1243,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
}, },
}, },
timeToPass: 10 * time.Minute, timeToPass: evictionTimeout - testNodeMonitorGracePeriod,
newNodeStatus: api.NodeStatus{ newNodeStatus: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
{ {
@ -1236,6 +1256,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}, },
}, },
expectedEvictPods: false, expectedEvictPods: false,
description: "Node created long time ago, node controller posted Unknown for a short period of time.",
}, },
// Node created long time ago, node controller posted Unknown for a long period of time. // Node created long time ago, node controller posted Unknown for a long period of time.
{ {
@ -1275,11 +1296,14 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}, },
}, },
expectedEvictPods: true, expectedEvictPods: true,
description: "Node created long time ago, node controller posted Unknown for a long period of time.",
}, },
} }
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, evictionTimeout, util.NewFakeRateLimiter()) nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10,
evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod)
nodeController.now = func() util.Time { return fakeNow } nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.MonitorNodeStatus(); err != nil { if err := nodeController.MonitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -1298,7 +1322,8 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
} }
} }
if item.expectedEvictPods != podEvicted { if item.expectedEvictPods != podEvicted {
t.Errorf("expected pod eviction: %+v, got %+v", item.expectedEvictPods, podEvicted) t.Errorf("expected pod eviction: %+v, got %+v for %+v", item.expectedEvictPods,
podEvicted, item.description)
} }
} }
} }
@ -1487,7 +1512,8 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
} }
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute, util.NewFakeRateLimiter()) nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute, util.NewFakeRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
nodeController.now = func() util.Time { return fakeNow } nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.MonitorNodeStatus(); err != nil { if err := nodeController.MonitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)

View File

@ -72,18 +72,6 @@ const (
initialNodeStatusUpdateFrequency = 100 * time.Millisecond initialNodeStatusUpdateFrequency = 100 * time.Millisecond
nodeStatusUpdateFrequencyInc = 500 * time.Millisecond nodeStatusUpdateFrequencyInc = 500 * time.Millisecond
// nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
// Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
// in nodecontroller. There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
// N means number of retries allowed for kubelet to post node status. It is pointless
// to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
// will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
// The constant must be less than podEvictionTimeout.
// 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
// status. Kubelet may fail to update node status reliablly if the value is too small,
// as it takes time to gather all necessary node information.
nodeStatusUpdateFrequency = 2 * time.Second
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed. // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5 nodeStatusUpdateRetry = 5
) )
@ -134,7 +122,8 @@ func NewMainKubelet(
recorder record.EventRecorder, recorder record.EventRecorder,
cadvisorInterface cadvisor.Interface, cadvisorInterface cadvisor.Interface,
imageGCPolicy ImageGCPolicy, imageGCPolicy ImageGCPolicy,
cloud cloudprovider.Interface) (*Kubelet, error) { cloud cloudprovider.Interface,
nodeStatusUpdateFrequency time.Duration) (*Kubelet, error) {
if rootDirectory == "" { if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory) return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
} }
@ -243,6 +232,7 @@ func NewMainKubelet(
cloud: cloud, cloud: cloud,
nodeRef: nodeRef, nodeRef: nodeRef,
containerManager: containerManager, containerManager: containerManager,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
} }
klet.podManager = newBasicPodManager(klet.kubeClient) klet.podManager = newBasicPodManager(klet.kubeClient)
@ -367,6 +357,19 @@ type Kubelet struct {
// Manage containers. // Manage containers.
containerManager *dockertools.DockerManager containerManager *dockertools.DockerManager
// nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
// Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
// in nodecontroller. There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
// N means number of retries allowed for kubelet to post node status. It is pointless
// to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
// will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
// The constant must be less than podEvictionTimeout.
// 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
// status. Kubelet may fail to update node status reliablly if the value is too small,
// as it takes time to gather all necessary node information.
nodeStatusUpdateFrequency time.Duration
} }
// getRootDir returns the full path to the directory under which kubelet can // getRootDir returns the full path to the directory under which kubelet can
@ -559,7 +562,7 @@ func (kl *Kubelet) syncNodeStatus() {
return return
} }
for feq := initialNodeStatusUpdateFrequency; feq < nodeStatusUpdateFrequency; feq += nodeStatusUpdateFrequencyInc { for feq := initialNodeStatusUpdateFrequency; feq < kl.nodeStatusUpdateFrequency; feq += nodeStatusUpdateFrequencyInc {
select { select {
case <-time.After(feq): case <-time.After(feq):
if err := kl.updateNodeStatus(); err != nil { if err := kl.updateNodeStatus(); err != nil {
@ -569,7 +572,7 @@ func (kl *Kubelet) syncNodeStatus() {
} }
for { for {
select { select {
case <-time.After(nodeStatusUpdateFrequency): case <-time.After(kl.nodeStatusUpdateFrequency):
if err := kl.updateNodeStatus(); err != nil { if err := kl.updateNodeStatus(); err != nil {
glog.Errorf("Unable to update node status: %v", err) glog.Errorf("Unable to update node status: %v", err)
} }