mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 15:58:37 +00:00
Merge pull request #5399 from ddysher/nc-keep-sync
Node controller monitor node status
This commit is contained in:
commit
3f57378972
@ -107,7 +107,9 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
|||||||
"The number of retries for initial node registration. Retry interval equals node_sync_period.")
|
"The number of retries for initial node registration. Retry interval equals node_sync_period.")
|
||||||
fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.")
|
fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.")
|
||||||
fs.BoolVar(&s.SyncNodeList, "sync_nodes", s.SyncNodeList, "If true, and --cloud_provider is specified, sync nodes from the cloud provider. Default true.")
|
fs.BoolVar(&s.SyncNodeList, "sync_nodes", s.SyncNodeList, "If true, and --cloud_provider is specified, sync nodes from the cloud provider. Default true.")
|
||||||
fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, "Should node controler send probes to kubelets and update NodeStatus.")
|
fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, ""+
|
||||||
|
"If true, node controller sends probes to kubelet and updates NodeStatus."+
|
||||||
|
"If false, Kubelet posts NodeStatus to API server.")
|
||||||
// TODO: Discover these by pinging the host machines, and rip out these flags.
|
// TODO: Discover these by pinging the host machines, and rip out these flags.
|
||||||
// TODO: in the meantime, use resource.QuantityFlag() instead of these
|
// TODO: in the meantime, use resource.QuantityFlag() instead of these
|
||||||
fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node")
|
fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node")
|
||||||
|
@ -51,7 +51,6 @@ type KubeletServer struct {
|
|||||||
SyncFrequency time.Duration
|
SyncFrequency time.Duration
|
||||||
FileCheckFrequency time.Duration
|
FileCheckFrequency time.Duration
|
||||||
HTTPCheckFrequency time.Duration
|
HTTPCheckFrequency time.Duration
|
||||||
StatusUpdateFrequency time.Duration
|
|
||||||
ManifestURL string
|
ManifestURL string
|
||||||
EnableServer bool
|
EnableServer bool
|
||||||
Address util.IP
|
Address util.IP
|
||||||
@ -85,13 +84,12 @@ type KubeletServer struct {
|
|||||||
// NewKubeletServer will create a new KubeletServer with default values.
|
// NewKubeletServer will create a new KubeletServer with default values.
|
||||||
func NewKubeletServer() *KubeletServer {
|
func NewKubeletServer() *KubeletServer {
|
||||||
return &KubeletServer{
|
return &KubeletServer{
|
||||||
SyncFrequency: 10 * time.Second,
|
SyncFrequency: 10 * time.Second,
|
||||||
FileCheckFrequency: 20 * time.Second,
|
FileCheckFrequency: 20 * time.Second,
|
||||||
HTTPCheckFrequency: 20 * time.Second,
|
HTTPCheckFrequency: 20 * time.Second,
|
||||||
StatusUpdateFrequency: 20 * time.Second,
|
EnableServer: true,
|
||||||
EnableServer: true,
|
Address: util.IP(net.ParseIP("127.0.0.1")),
|
||||||
Address: util.IP(net.ParseIP("127.0.0.1")),
|
Port: ports.KubeletPort,
|
||||||
Port: ports.KubeletPort,
|
|
||||||
PodInfraContainerImage: kubelet.PodInfraContainerImage,
|
PodInfraContainerImage: kubelet.PodInfraContainerImage,
|
||||||
RootDirectory: defaultRootDir,
|
RootDirectory: defaultRootDir,
|
||||||
RegistryBurst: 10,
|
RegistryBurst: 10,
|
||||||
@ -112,7 +110,6 @@ func NewKubeletServer() *KubeletServer {
|
|||||||
func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
|
func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
|
||||||
fs.StringVar(&s.Config, "config", s.Config, "Path to the config file or directory of files")
|
fs.StringVar(&s.Config, "config", s.Config, "Path to the config file or directory of files")
|
||||||
fs.DurationVar(&s.SyncFrequency, "sync_frequency", s.SyncFrequency, "Max period between synchronizing running containers and config")
|
fs.DurationVar(&s.SyncFrequency, "sync_frequency", s.SyncFrequency, "Max period between synchronizing running containers and config")
|
||||||
fs.DurationVar(&s.StatusUpdateFrequency, "status_update_frequency", s.StatusUpdateFrequency, "Duration between posting node status to master")
|
|
||||||
fs.DurationVar(&s.FileCheckFrequency, "file_check_frequency", s.FileCheckFrequency, "Duration between checking config files for new data")
|
fs.DurationVar(&s.FileCheckFrequency, "file_check_frequency", s.FileCheckFrequency, "Duration between checking config files for new data")
|
||||||
fs.DurationVar(&s.HTTPCheckFrequency, "http_check_frequency", s.HTTPCheckFrequency, "Duration between checking http for new data")
|
fs.DurationVar(&s.HTTPCheckFrequency, "http_check_frequency", s.HTTPCheckFrequency, "Duration between checking http for new data")
|
||||||
fs.StringVar(&s.ManifestURL, "manifest_url", s.ManifestURL, "URL for accessing the container manifest")
|
fs.StringVar(&s.ManifestURL, "manifest_url", s.ManifestURL, "URL for accessing the container manifest")
|
||||||
@ -179,7 +176,6 @@ func (s *KubeletServer) Run(_ []string) error {
|
|||||||
RootDirectory: s.RootDirectory,
|
RootDirectory: s.RootDirectory,
|
||||||
ConfigFile: s.Config,
|
ConfigFile: s.Config,
|
||||||
ManifestURL: s.ManifestURL,
|
ManifestURL: s.ManifestURL,
|
||||||
StatusUpdateFrequency: s.StatusUpdateFrequency,
|
|
||||||
FileCheckFrequency: s.FileCheckFrequency,
|
FileCheckFrequency: s.FileCheckFrequency,
|
||||||
HTTPCheckFrequency: s.HTTPCheckFrequency,
|
HTTPCheckFrequency: s.HTTPCheckFrequency,
|
||||||
PodInfraContainerImage: s.PodInfraContainerImage,
|
PodInfraContainerImage: s.PodInfraContainerImage,
|
||||||
@ -285,7 +281,6 @@ func SimpleKubelet(client *client.Client,
|
|||||||
EnableDebuggingHandlers: true,
|
EnableDebuggingHandlers: true,
|
||||||
HTTPCheckFrequency: 1 * time.Second,
|
HTTPCheckFrequency: 1 * time.Second,
|
||||||
FileCheckFrequency: 1 * time.Second,
|
FileCheckFrequency: 1 * time.Second,
|
||||||
StatusUpdateFrequency: 3 * time.Second,
|
|
||||||
SyncFrequency: 3 * time.Second,
|
SyncFrequency: 3 * time.Second,
|
||||||
MinimumGCAge: 10 * time.Second,
|
MinimumGCAge: 10 * time.Second,
|
||||||
MaxPerPodContainerCount: 5,
|
MaxPerPodContainerCount: 5,
|
||||||
@ -380,7 +375,6 @@ type KubeletConfig struct {
|
|||||||
RootDirectory string
|
RootDirectory string
|
||||||
ConfigFile string
|
ConfigFile string
|
||||||
ManifestURL string
|
ManifestURL string
|
||||||
StatusUpdateFrequency time.Duration
|
|
||||||
FileCheckFrequency time.Duration
|
FileCheckFrequency time.Duration
|
||||||
HTTPCheckFrequency time.Duration
|
HTTPCheckFrequency time.Duration
|
||||||
Hostname string
|
Hostname string
|
||||||
@ -446,7 +440,6 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
|
|||||||
kc.StreamingConnectionIdleTimeout,
|
kc.StreamingConnectionIdleTimeout,
|
||||||
kc.Recorder,
|
kc.Recorder,
|
||||||
kc.CadvisorInterface,
|
kc.CadvisorInterface,
|
||||||
kc.StatusUpdateFrequency,
|
|
||||||
kc.ImageGCPolicy)
|
kc.ImageGCPolicy)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -34,6 +34,32 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// The constant is used if sync_nodes_status=False. NodeController will not proactively
|
||||||
|
// sync node status in this case, but will monitor node status updated from kubelet. If
|
||||||
|
// it doesn't receive update for this amount of time, it will start posting "NodeReady==
|
||||||
|
// ConditionUnknown". The amount of time before which NodeController start evicting pods
|
||||||
|
// is controlled via flag 'pod_eviction_timeout'.
|
||||||
|
// Note: be cautious when changing the constant, it must work with nodeStatusUpdateFrequency
|
||||||
|
// in kubelet. There are several constraints:
|
||||||
|
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
|
||||||
|
// N means number of retries allowed for kubelet to post node status. It is pointless
|
||||||
|
// to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
|
||||||
|
// will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
|
||||||
|
// The constant must be less than podEvictionTimeout.
|
||||||
|
// 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes
|
||||||
|
// longer for user to see up-to-date node status.
|
||||||
|
nodeMonitorGracePeriod = 8 * time.Second
|
||||||
|
// The constant is used if sync_nodes_status=False, only for node startup. When node
|
||||||
|
// is just created, e.g. cluster bootstrap or node creation, we give a longer grace period.
|
||||||
|
nodeStartupGracePeriod = 30 * time.Second
|
||||||
|
// The constant is used if sync_nodes_status=False. It controls NodeController monitoring
|
||||||
|
// period, i.e. how often does NodeController check node status posted from kubelet.
|
||||||
|
// Theoretically, this value should be lower than nodeMonitorGracePeriod.
|
||||||
|
// TODO: Change node status monitor to watch based.
|
||||||
|
nodeMonitorPeriod = 5 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrRegistration = errors.New("unable to register all nodes.")
|
ErrRegistration = errors.New("unable to register all nodes.")
|
||||||
ErrQueryIPAddress = errors.New("unable to query IP address.")
|
ErrQueryIPAddress = errors.New("unable to query IP address.")
|
||||||
@ -49,12 +75,12 @@ type NodeController struct {
|
|||||||
kubeletClient client.KubeletClient
|
kubeletClient client.KubeletClient
|
||||||
registerRetryCount int
|
registerRetryCount int
|
||||||
podEvictionTimeout time.Duration
|
podEvictionTimeout time.Duration
|
||||||
lookupIP func(host string) ([]net.IP, error)
|
// Method for easy mocking in unittest.
|
||||||
|
lookupIP func(host string) ([]net.IP, error)
|
||||||
|
now func() util.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNodeController returns a new node controller to sync instances from cloudprovider.
|
// NewNodeController returns a new node controller to sync instances from cloudprovider.
|
||||||
// TODO: NodeController health checker should be a separate package other than
|
|
||||||
// kubeletclient, node health check != kubelet health check.
|
|
||||||
func NewNodeController(
|
func NewNodeController(
|
||||||
cloud cloudprovider.Interface,
|
cloud cloudprovider.Interface,
|
||||||
matchRE string,
|
matchRE string,
|
||||||
@ -74,85 +100,85 @@ func NewNodeController(
|
|||||||
registerRetryCount: registerRetryCount,
|
registerRetryCount: registerRetryCount,
|
||||||
podEvictionTimeout: podEvictionTimeout,
|
podEvictionTimeout: podEvictionTimeout,
|
||||||
lookupIP: net.LookupIP,
|
lookupIP: net.LookupIP,
|
||||||
|
now: util.Now,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run creates initial node list and start syncing instances from cloudprovider if any.
|
// Run creates initial node list and start syncing instances from cloudprovider, if any.
|
||||||
// It also starts syncing cluster node status.
|
// It also starts syncing or monitoring cluster node status.
|
||||||
// 1. RegisterNodes() is called only once to register all initial nodes (from cloudprovider
|
// 1. RegisterNodes() is called only once to register all initial nodes (from cloudprovider
|
||||||
// or from command line flag). To make cluster bootstrap faster, node controller populates
|
// or from command line flag). To make cluster bootstrap faster, node controller populates
|
||||||
// node addresses.
|
// node addresses.
|
||||||
// 2. SyncCloud() is called periodically (if enabled) to sync instances from cloudprovider.
|
// 2. SyncCloudNodes() is called periodically (if enabled) to sync instances from cloudprovider.
|
||||||
// Node created here will only have specs.
|
// Node created here will only have specs.
|
||||||
// 3. SyncNodeStatus() is called periodically (if enabled) to sync node status for nodes in
|
// 3. Depending on how k8s is configured, there are two ways of syncing the node status:
|
||||||
// k8s cluster.
|
// 3.1 SyncProbedNodeStatus() is called periodically to trigger master to probe kubelet,
|
||||||
func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus bool) {
|
// and incorporate the resulting node status.
|
||||||
|
// 3.2 MonitorNodeStatus() is called periodically to incorporate the results of node status
|
||||||
|
// pushed from kubelet to master.
|
||||||
|
func (nc *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus bool) {
|
||||||
// Register intial set of nodes with their status set.
|
// Register intial set of nodes with their status set.
|
||||||
var nodes *api.NodeList
|
var nodes *api.NodeList
|
||||||
var err error
|
var err error
|
||||||
if s.isRunningCloudProvider() {
|
if nc.isRunningCloudProvider() {
|
||||||
if syncNodeList {
|
if syncNodeList {
|
||||||
nodes, err = s.GetCloudNodesWithSpec()
|
if nodes, err = nc.GetCloudNodesWithSpec(); err != nil {
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error loading initial node from cloudprovider: %v", err)
|
glog.Errorf("Error loading initial node from cloudprovider: %v", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
nodes = &api.NodeList{}
|
nodes = &api.NodeList{}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
nodes, err = s.GetStaticNodesWithSpec()
|
if nodes, err = nc.GetStaticNodesWithSpec(); err != nil {
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error loading initial static nodes: %v", err)
|
glog.Errorf("Error loading initial static nodes: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
nodes, err = s.PopulateAddresses(nodes)
|
if nodes, err = nc.PopulateAddresses(nodes); err != nil {
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error getting nodes ips: %v", err)
|
glog.Errorf("Error getting nodes ips: %v", err)
|
||||||
}
|
}
|
||||||
if err = s.RegisterNodes(nodes, s.registerRetryCount, period); err != nil {
|
if err = nc.RegisterNodes(nodes, nc.registerRetryCount, period); err != nil {
|
||||||
glog.Errorf("Error registering node list %+v: %v", nodes, err)
|
glog.Errorf("Error registering node list %+v: %v", nodes, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start syncing node list from cloudprovider.
|
// Start syncing node list from cloudprovider.
|
||||||
if syncNodeList && s.isRunningCloudProvider() {
|
if syncNodeList && nc.isRunningCloudProvider() {
|
||||||
go util.Forever(func() {
|
go util.Forever(func() {
|
||||||
if err = s.SyncCloud(); err != nil {
|
if err = nc.SyncCloudNodes(); err != nil {
|
||||||
glog.Errorf("Error syncing cloud: %v", err)
|
glog.Errorf("Error syncing cloud: %v", err)
|
||||||
}
|
}
|
||||||
}, period)
|
}, period)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start syncing or monitoring node status.
|
||||||
if syncNodeStatus {
|
if syncNodeStatus {
|
||||||
// Start syncing node status.
|
|
||||||
go util.Forever(func() {
|
go util.Forever(func() {
|
||||||
if err = s.SyncNodeStatus(); err != nil {
|
if err = nc.SyncProbedNodeStatus(); err != nil {
|
||||||
glog.Errorf("Error syncing status: %v", err)
|
glog.Errorf("Error syncing status: %v", err)
|
||||||
}
|
}
|
||||||
}, period)
|
}, period)
|
||||||
} else {
|
} else {
|
||||||
// Start checking node reachability and evicting timeouted pods.
|
|
||||||
go util.Forever(func() {
|
go util.Forever(func() {
|
||||||
if err = s.EvictTimeoutedPods(); err != nil {
|
if err = nc.MonitorNodeStatus(); err != nil {
|
||||||
glog.Errorf("Error evicting timeouted pods: %v", err)
|
glog.Errorf("Error monitoring node status: %v", err)
|
||||||
}
|
}
|
||||||
}, period)
|
}, nodeMonitorPeriod)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterNodes registers the given list of nodes, it keeps retrying for `retryCount` times.
|
// RegisterNodes registers the given list of nodes, it keeps retrying for `retryCount` times.
|
||||||
func (s *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retryInterval time.Duration) error {
|
func (nc *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retryInterval time.Duration) error {
|
||||||
if len(nodes.Items) == 0 {
|
if len(nodes.Items) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
registered := util.NewStringSet()
|
registered := util.NewStringSet()
|
||||||
nodes = s.canonicalizeName(nodes)
|
nodes = nc.canonicalizeName(nodes)
|
||||||
for i := 0; i < retryCount; i++ {
|
for i := 0; i < retryCount; i++ {
|
||||||
for _, node := range nodes.Items {
|
for _, node := range nodes.Items {
|
||||||
if registered.Has(node.Name) {
|
if registered.Has(node.Name) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
_, err := s.kubeClient.Nodes().Create(&node)
|
_, err := nc.kubeClient.Nodes().Create(&node)
|
||||||
if err == nil || apierrors.IsAlreadyExists(err) {
|
if err == nil || apierrors.IsAlreadyExists(err) {
|
||||||
registered.Insert(node.Name)
|
registered.Insert(node.Name)
|
||||||
glog.Infof("Registered node in registry: %s", node.Name)
|
glog.Infof("Registered node in registry: %s", node.Name)
|
||||||
@ -173,13 +199,13 @@ func (s *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retr
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SyncCloud synchronizes the list of instances from cloudprovider to master server.
|
// SyncCloudNodes synchronizes the list of instances from cloudprovider to master server.
|
||||||
func (s *NodeController) SyncCloud() error {
|
func (nc *NodeController) SyncCloudNodes() error {
|
||||||
matches, err := s.GetCloudNodesWithSpec()
|
matches, err := nc.GetCloudNodesWithSpec()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
nodes, err := s.kubeClient.Nodes().List()
|
nodes, err := nc.kubeClient.Nodes().List()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -193,7 +219,7 @@ func (s *NodeController) SyncCloud() error {
|
|||||||
for _, node := range matches.Items {
|
for _, node := range matches.Items {
|
||||||
if _, ok := nodeMap[node.Name]; !ok {
|
if _, ok := nodeMap[node.Name]; !ok {
|
||||||
glog.Infof("Create node in registry: %s", node.Name)
|
glog.Infof("Create node in registry: %s", node.Name)
|
||||||
_, err = s.kubeClient.Nodes().Create(&node)
|
_, err = nc.kubeClient.Nodes().Create(&node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Create node %s error: %v", node.Name, err)
|
glog.Errorf("Create node %s error: %v", node.Name, err)
|
||||||
}
|
}
|
||||||
@ -204,24 +230,23 @@ func (s *NodeController) SyncCloud() error {
|
|||||||
// Delete nodes which have been deleted from cloud, but not from kubernetes cluster.
|
// Delete nodes which have been deleted from cloud, but not from kubernetes cluster.
|
||||||
for nodeID := range nodeMap {
|
for nodeID := range nodeMap {
|
||||||
glog.Infof("Delete node from registry: %s", nodeID)
|
glog.Infof("Delete node from registry: %s", nodeID)
|
||||||
err = s.kubeClient.Nodes().Delete(nodeID)
|
err = nc.kubeClient.Nodes().Delete(nodeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Delete node %s error: %v", nodeID, err)
|
glog.Errorf("Delete node %s error: %v", nodeID, err)
|
||||||
}
|
}
|
||||||
s.deletePods(nodeID)
|
nc.deletePods(nodeID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SyncNodeStatus synchronizes cluster nodes status to master server.
|
// SyncProbedNodeStatus synchronizes cluster nodes status to master server.
|
||||||
func (s *NodeController) SyncNodeStatus() error {
|
func (nc *NodeController) SyncProbedNodeStatus() error {
|
||||||
nodes, err := s.kubeClient.Nodes().List()
|
nodes, err := nc.kubeClient.Nodes().List()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
nodes = s.UpdateNodesStatus(nodes)
|
nodes, err = nc.PopulateNodesStatus(nodes)
|
||||||
nodes, err = s.PopulateAddresses(nodes)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -230,7 +255,7 @@ func (s *NodeController) SyncNodeStatus() error {
|
|||||||
// useful after we introduce per-probe status field, e.g. 'LastProbeTime', which will
|
// useful after we introduce per-probe status field, e.g. 'LastProbeTime', which will
|
||||||
// differ in every call of the sync loop.
|
// differ in every call of the sync loop.
|
||||||
glog.V(2).Infof("updating node %v", node.Name)
|
glog.V(2).Infof("updating node %v", node.Name)
|
||||||
_, err = s.kubeClient.Nodes().Update(&node)
|
_, err = nc.kubeClient.Nodes().Update(&node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("error updating node %s: %v", node.Name, err)
|
glog.Errorf("error updating node %s: %v", node.Name, err)
|
||||||
}
|
}
|
||||||
@ -238,37 +263,127 @@ func (s *NodeController) SyncNodeStatus() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// EvictTimeoutedPods verifies if nodes are reachable by checking the time of last probe
|
// PopulateNodesStatus populates node status for given list of nodes.
|
||||||
// and deletes pods from not reachable nodes.
|
func (nc *NodeController) PopulateNodesStatus(nodes *api.NodeList) (*api.NodeList, error) {
|
||||||
func (s *NodeController) EvictTimeoutedPods() error {
|
var wg sync.WaitGroup
|
||||||
nodes, err := s.kubeClient.Nodes().List()
|
wg.Add(len(nodes.Items))
|
||||||
|
for i := range nodes.Items {
|
||||||
|
go func(node *api.Node) {
|
||||||
|
node.Status.Conditions = nc.DoCheck(node)
|
||||||
|
if err := nc.populateNodeInfo(node); err != nil {
|
||||||
|
glog.Errorf("Can't collect information for node %s: %v", node.Name, err)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}(&nodes.Items[i])
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
return nc.PopulateAddresses(nodes)
|
||||||
|
}
|
||||||
|
|
||||||
|
// populateNodeInfo gets node info from kubelet and update the node.
|
||||||
|
func (nc *NodeController) populateNodeInfo(node *api.Node) error {
|
||||||
|
nodeInfo, err := nc.kubeletClient.GetNodeInfo(node.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, node := range nodes.Items {
|
for key, value := range nodeInfo.Capacity {
|
||||||
if util.Now().After(latestReadyTime(&node).Add(s.podEvictionTimeout)) {
|
node.Spec.Capacity[key] = value
|
||||||
s.deletePods(node.Name)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
node.Status.NodeInfo = nodeInfo.NodeSystemInfo
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func latestReadyTime(node *api.Node) util.Time {
|
// DoCheck performs various condition checks for given node.
|
||||||
readyTime := node.ObjectMeta.CreationTimestamp
|
func (nc *NodeController) DoCheck(node *api.Node) []api.NodeCondition {
|
||||||
for _, condition := range node.Status.Conditions {
|
var conditions []api.NodeCondition
|
||||||
if condition.Type == api.NodeReady &&
|
|
||||||
condition.Status == api.ConditionFull &&
|
// Check Condition: NodeReady. TODO: More node conditions.
|
||||||
condition.LastProbeTime.After(readyTime.Time) {
|
oldReadyCondition := nc.getCondition(node, api.NodeReady)
|
||||||
readyTime = condition.LastProbeTime
|
newReadyCondition := nc.checkNodeReady(node)
|
||||||
|
nc.updateLastTransitionTime(oldReadyCondition, newReadyCondition)
|
||||||
|
if newReadyCondition.Status != api.ConditionFull {
|
||||||
|
// Node is not ready for this probe, we need to check if pods need to be deleted.
|
||||||
|
if newReadyCondition.LastProbeTime.After(newReadyCondition.LastTransitionTime.Add(nc.podEvictionTimeout)) {
|
||||||
|
// As long as the node fails, we call delete pods to delete all pods. Node controller sync
|
||||||
|
// is not a closed loop process, there is no feedback from other components regarding pod
|
||||||
|
// status. Keep listing pods to sanity check if pods are all deleted makes more sense.
|
||||||
|
nc.deletePods(node.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
conditions = append(conditions, *newReadyCondition)
|
||||||
|
|
||||||
|
// Check Condition: NodeSchedulable
|
||||||
|
oldSchedulableCondition := nc.getCondition(node, api.NodeSchedulable)
|
||||||
|
newSchedulableCondition := nc.checkNodeSchedulable(node)
|
||||||
|
nc.updateLastTransitionTime(oldSchedulableCondition, newSchedulableCondition)
|
||||||
|
conditions = append(conditions, *newSchedulableCondition)
|
||||||
|
|
||||||
|
return conditions
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateLastTransitionTime updates LastTransitionTime for the newCondition based on oldCondition.
|
||||||
|
func (nc *NodeController) updateLastTransitionTime(oldCondition, newCondition *api.NodeCondition) {
|
||||||
|
if oldCondition != nil && oldCondition.Status == newCondition.Status {
|
||||||
|
// If node status doesn't change, transition time is same as last time.
|
||||||
|
newCondition.LastTransitionTime = oldCondition.LastTransitionTime
|
||||||
|
} else {
|
||||||
|
// Set transition time to Now() if node status changes or `oldCondition` is nil, which
|
||||||
|
// happens only when the node is checked for the first time.
|
||||||
|
newCondition.LastTransitionTime = nc.now()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkNodeSchedulable checks node schedulable condition, without transition timestamp set.
|
||||||
|
func (nc *NodeController) checkNodeSchedulable(node *api.Node) *api.NodeCondition {
|
||||||
|
if node.Spec.Unschedulable {
|
||||||
|
return &api.NodeCondition{
|
||||||
|
Type: api.NodeSchedulable,
|
||||||
|
Status: api.ConditionNone,
|
||||||
|
Reason: "User marked unschedulable during node create/update",
|
||||||
|
LastProbeTime: nc.now(),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return &api.NodeCondition{
|
||||||
|
Type: api.NodeSchedulable,
|
||||||
|
Status: api.ConditionFull,
|
||||||
|
Reason: "Node is schedulable by default",
|
||||||
|
LastProbeTime: nc.now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkNodeReady checks raw node ready condition, without transition timestamp set.
|
||||||
|
func (nc *NodeController) checkNodeReady(node *api.Node) *api.NodeCondition {
|
||||||
|
switch status, err := nc.kubeletClient.HealthCheck(node.Name); {
|
||||||
|
case err != nil:
|
||||||
|
glog.V(2).Infof("NodeController: node %s health check error: %v", node.Name, err)
|
||||||
|
return &api.NodeCondition{
|
||||||
|
Type: api.NodeReady,
|
||||||
|
Status: api.ConditionUnknown,
|
||||||
|
Reason: fmt.Sprintf("Node health check error: %v", err),
|
||||||
|
LastProbeTime: nc.now(),
|
||||||
|
}
|
||||||
|
case status == probe.Failure:
|
||||||
|
return &api.NodeCondition{
|
||||||
|
Type: api.NodeReady,
|
||||||
|
Status: api.ConditionNone,
|
||||||
|
Reason: fmt.Sprintf("Node health check failed: kubelet /healthz endpoint returns not ok"),
|
||||||
|
LastProbeTime: nc.now(),
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return &api.NodeCondition{
|
||||||
|
Type: api.NodeReady,
|
||||||
|
Status: api.ConditionFull,
|
||||||
|
Reason: fmt.Sprintf("Node health check succeeded: kubelet /healthz endpoint returns ok"),
|
||||||
|
LastProbeTime: nc.now(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return readyTime
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// PopulateAddresses queries Address for given list of nodes.
|
// PopulateAddresses queries Address for given list of nodes.
|
||||||
func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, error) {
|
func (nc *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, error) {
|
||||||
if s.isRunningCloudProvider() {
|
if nc.isRunningCloudProvider() {
|
||||||
instances, ok := s.cloud.Instances()
|
instances, ok := nc.cloud.Instances()
|
||||||
if !ok {
|
if !ok {
|
||||||
return nodes, ErrCloudInstance
|
return nodes, ErrCloudInstance
|
||||||
}
|
}
|
||||||
@ -289,7 +404,7 @@ func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList,
|
|||||||
address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addr.String()}
|
address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addr.String()}
|
||||||
node.Status.Addresses = []api.NodeAddress{address}
|
node.Status.Addresses = []api.NodeAddress{address}
|
||||||
} else {
|
} else {
|
||||||
addrs, err := s.lookupIP(node.Name)
|
addrs, err := nc.lookupIP(node.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Can't get ip address of node %s: %v", node.Name, err)
|
glog.Errorf("Can't get ip address of node %s: %v", node.Name, err)
|
||||||
} else if len(addrs) == 0 {
|
} else if len(addrs) == 0 {
|
||||||
@ -304,153 +419,93 @@ func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList,
|
|||||||
return nodes, nil
|
return nodes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateNodesStatus performs various condition checks for given list of nodes.
|
// MonitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
|
||||||
func (s *NodeController) UpdateNodesStatus(nodes *api.NodeList) *api.NodeList {
|
// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
|
||||||
var wg sync.WaitGroup
|
// not reachable for a long period of time.
|
||||||
wg.Add(len(nodes.Items))
|
func (nc *NodeController) MonitorNodeStatus() error {
|
||||||
|
nodes, err := nc.kubeClient.Nodes().List()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
for i := range nodes.Items {
|
for i := range nodes.Items {
|
||||||
go func(node *api.Node) {
|
var gracePeriod time.Duration
|
||||||
node.Status.Conditions = s.DoCheck(node)
|
var lastReadyCondition api.NodeCondition
|
||||||
if err := s.updateNodeInfo(node); err != nil {
|
node := &nodes.Items[i]
|
||||||
glog.Errorf("Can't collect information for node %s: %v", node.Name, err)
|
readyCondition := nc.getCondition(node, api.NodeReady)
|
||||||
|
if readyCondition == nil {
|
||||||
|
// If ready condition is nil, then kubelet (or nodecontroller) never posted node status.
|
||||||
|
// A fake ready condition is created, where LastProbeTime and LastTransitionTime is set
|
||||||
|
// to node.CreationTimestamp to avoid handle the corner case.
|
||||||
|
lastReadyCondition = api.NodeCondition{
|
||||||
|
Type: api.NodeReady,
|
||||||
|
Status: api.ConditionUnknown,
|
||||||
|
LastProbeTime: node.CreationTimestamp,
|
||||||
|
LastTransitionTime: node.CreationTimestamp,
|
||||||
}
|
}
|
||||||
wg.Done()
|
gracePeriod = nodeStartupGracePeriod
|
||||||
}(&nodes.Items[i])
|
} else {
|
||||||
}
|
// If ready condition is not nil, make a copy of it, since we may modify it in place later.
|
||||||
wg.Wait()
|
lastReadyCondition = *readyCondition
|
||||||
return nodes
|
gracePeriod = nodeMonitorGracePeriod
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *NodeController) updateNodeInfo(node *api.Node) error {
|
// Check last time when NodeReady was updated.
|
||||||
nodeInfo, err := s.kubeletClient.GetNodeInfo(node.Name)
|
if nc.now().After(lastReadyCondition.LastProbeTime.Add(gracePeriod)) {
|
||||||
if err != nil {
|
// NodeReady condition was last set longer ago than gracePeriod, so update it to Unknown
|
||||||
return err
|
// (regardless of its current value) in the master, without contacting kubelet.
|
||||||
}
|
if readyCondition == nil {
|
||||||
for key, value := range nodeInfo.Capacity {
|
glog.V(2).Infof("node %v is never updated by kubelet")
|
||||||
node.Spec.Capacity[key] = value
|
node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{
|
||||||
}
|
Type: api.NodeReady,
|
||||||
node.Status.NodeInfo = nodeInfo.NodeSystemInfo
|
Status: api.ConditionUnknown,
|
||||||
return nil
|
Reason: fmt.Sprintf("Kubelet never posted node status"),
|
||||||
}
|
LastProbeTime: node.CreationTimestamp,
|
||||||
|
LastTransitionTime: nc.now(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
glog.V(2).Infof("node %v hasn't been updated for %+v. Last ready condition is: %+v",
|
||||||
|
node.Name, nc.now().Time.Sub(lastReadyCondition.LastProbeTime.Time), lastReadyCondition)
|
||||||
|
if lastReadyCondition.Status != api.ConditionUnknown {
|
||||||
|
readyCondition.Status = api.ConditionUnknown
|
||||||
|
readyCondition.Reason = fmt.Sprintf("Kubelet stopped posting node status")
|
||||||
|
// LastProbeTime is the last time we heard from kubelet.
|
||||||
|
readyCondition.LastProbeTime = lastReadyCondition.LastProbeTime
|
||||||
|
readyCondition.LastTransitionTime = nc.now()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_, err = nc.kubeClient.Nodes().Update(node)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("error updating node %s: %v", node.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// DoCheck performs various condition checks for given node.
|
if readyCondition != nil {
|
||||||
func (s *NodeController) DoCheck(node *api.Node) []api.NodeCondition {
|
// Check eviction timeout.
|
||||||
var conditions []api.NodeCondition
|
if lastReadyCondition.Status == api.ConditionNone &&
|
||||||
|
nc.now().After(lastReadyCondition.LastTransitionTime.Add(nc.podEvictionTimeout)) {
|
||||||
// Check Condition: NodeReady. TODO: More node conditions.
|
// Node stays in not ready for at least 'podEvictionTimeout' - evict all pods on the unhealthy node.
|
||||||
oldReadyCondition := s.getCondition(node, api.NodeReady)
|
nc.deletePods(node.Name)
|
||||||
newReadyCondition := s.checkNodeReady(node)
|
}
|
||||||
s.updateLastTransitionTime(oldReadyCondition, newReadyCondition)
|
if lastReadyCondition.Status == api.ConditionUnknown &&
|
||||||
|
nc.now().After(lastReadyCondition.LastProbeTime.Add(nc.podEvictionTimeout-gracePeriod)) {
|
||||||
if newReadyCondition.Status != api.ConditionFull {
|
// Same as above. Note however, since condition unknown is posted by node controller, which means we
|
||||||
// Node is not ready for this probe, we need to check if pods need to be deleted.
|
// need to substract monitoring grace period in order to get the real 'podEvictionTimeout'.
|
||||||
if newReadyCondition.LastProbeTime.After(newReadyCondition.LastTransitionTime.Add(s.podEvictionTimeout)) {
|
nc.deletePods(node.Name)
|
||||||
// As long as the node fails, we call delete pods to delete all pods. Node controller sync
|
}
|
||||||
// is not a closed loop process, there is no feedback from other components regarding pod
|
|
||||||
// status. Keep listing pods to sanity check if pods are all deleted makes more sense.
|
|
||||||
s.deletePods(node.Name)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
conditions = append(conditions, *newReadyCondition)
|
|
||||||
|
|
||||||
// Check Condition: NodeSchedulable
|
|
||||||
oldSchedulableCondition := s.getCondition(node, api.NodeSchedulable)
|
|
||||||
newSchedulableCondition := s.checkNodeSchedulable(node)
|
|
||||||
s.updateLastTransitionTime(oldSchedulableCondition, newSchedulableCondition)
|
|
||||||
conditions = append(conditions, *newSchedulableCondition)
|
|
||||||
|
|
||||||
return conditions
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateLastTransitionTime updates LastTransitionTime for the newCondition based on oldCondition.
|
|
||||||
func (s *NodeController) updateLastTransitionTime(oldCondition, newCondition *api.NodeCondition) {
|
|
||||||
if oldCondition != nil && oldCondition.Status == newCondition.Status {
|
|
||||||
// If node status doesn't change, transition time is same as last time.
|
|
||||||
newCondition.LastTransitionTime = oldCondition.LastTransitionTime
|
|
||||||
} else {
|
|
||||||
// Set transition time to Now() if node status changes or `oldCondition` is nil, which
|
|
||||||
// happens only when the node is checked for the first time.
|
|
||||||
newCondition.LastTransitionTime = util.Now()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// checkNodeSchedulable checks node schedulable condition, without transition timestamp set.
|
|
||||||
func (s *NodeController) checkNodeSchedulable(node *api.Node) *api.NodeCondition {
|
|
||||||
if node.Spec.Unschedulable {
|
|
||||||
return &api.NodeCondition{
|
|
||||||
Type: api.NodeSchedulable,
|
|
||||||
Status: api.ConditionNone,
|
|
||||||
Reason: "User marked unschedulable during node create/update",
|
|
||||||
LastProbeTime: util.Now(),
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return &api.NodeCondition{
|
|
||||||
Type: api.NodeSchedulable,
|
|
||||||
Status: api.ConditionFull,
|
|
||||||
Reason: "Node is schedulable by default",
|
|
||||||
LastProbeTime: util.Now(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// checkNodeReady checks raw node ready condition, without transition timestamp set.
|
|
||||||
func (s *NodeController) checkNodeReady(node *api.Node) *api.NodeCondition {
|
|
||||||
switch status, err := s.kubeletClient.HealthCheck(node.Name); {
|
|
||||||
case err != nil:
|
|
||||||
glog.V(2).Infof("NodeController: node %s health check error: %v", node.Name, err)
|
|
||||||
return &api.NodeCondition{
|
|
||||||
Type: api.NodeReady,
|
|
||||||
Status: api.ConditionUnknown,
|
|
||||||
Reason: fmt.Sprintf("Node health check error: %v", err),
|
|
||||||
LastProbeTime: util.Now(),
|
|
||||||
}
|
|
||||||
case status == probe.Failure:
|
|
||||||
return &api.NodeCondition{
|
|
||||||
Type: api.NodeReady,
|
|
||||||
Status: api.ConditionNone,
|
|
||||||
Reason: fmt.Sprintf("Node health check failed: kubelet /healthz endpoint returns not ok"),
|
|
||||||
LastProbeTime: util.Now(),
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
return &api.NodeCondition{
|
|
||||||
Type: api.NodeReady,
|
|
||||||
Status: api.ConditionFull,
|
|
||||||
Reason: fmt.Sprintf("Node health check succeeded: kubelet /healthz endpoint returns ok"),
|
|
||||||
LastProbeTime: util.Now(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// deletePods will delete all pods from master running on given node.
|
|
||||||
func (s *NodeController) deletePods(nodeID string) error {
|
|
||||||
glog.V(2).Infof("Delete all pods from %v", nodeID)
|
|
||||||
// TODO: We don't yet have field selectors from client, see issue #1362.
|
|
||||||
pods, err := s.kubeClient.Pods(api.NamespaceAll).List(labels.Everything())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for _, pod := range pods.Items {
|
|
||||||
if pod.Status.Host != nodeID {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
glog.V(2).Infof("Delete pod %v", pod.Name)
|
|
||||||
if err := s.kubeClient.Pods(pod.Namespace).Delete(pod.Name); err != nil {
|
|
||||||
glog.Errorf("Error deleting pod %v: %v", pod.Name, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetStaticNodesWithSpec constructs and returns api.NodeList for static nodes. If error
|
// GetStaticNodesWithSpec constructs and returns api.NodeList for static nodes. If error
|
||||||
// occurs, an empty NodeList will be returned with a non-nil error info. The
|
// occurs, an empty NodeList will be returned with a non-nil error info. The method only
|
||||||
// method only constructs spec fields for nodes.
|
// constructs spec fields for nodes.
|
||||||
func (s *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) {
|
func (nc *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) {
|
||||||
result := &api.NodeList{}
|
result := &api.NodeList{}
|
||||||
for _, nodeID := range s.nodes {
|
for _, nodeID := range nc.nodes {
|
||||||
node := api.Node{
|
node := api.Node{
|
||||||
ObjectMeta: api.ObjectMeta{Name: nodeID},
|
ObjectMeta: api.ObjectMeta{Name: nodeID},
|
||||||
Spec: api.NodeSpec{Capacity: s.staticResources.Capacity},
|
Spec: api.NodeSpec{Capacity: nc.staticResources.Capacity},
|
||||||
}
|
}
|
||||||
result.Items = append(result.Items, node)
|
result.Items = append(result.Items, node)
|
||||||
}
|
}
|
||||||
@ -458,15 +513,15 @@ func (s *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetCloudNodesWithSpec constructs and returns api.NodeList from cloudprovider. If error
|
// GetCloudNodesWithSpec constructs and returns api.NodeList from cloudprovider. If error
|
||||||
// occurs, an empty NodeList will be returned with a non-nil error info. The
|
// occurs, an empty NodeList will be returned with a non-nil error info. The method only
|
||||||
// method only constructs spec fields for nodes.
|
// constructs spec fields for nodes.
|
||||||
func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) {
|
func (nc *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) {
|
||||||
result := &api.NodeList{}
|
result := &api.NodeList{}
|
||||||
instances, ok := s.cloud.Instances()
|
instances, ok := nc.cloud.Instances()
|
||||||
if !ok {
|
if !ok {
|
||||||
return result, ErrCloudInstance
|
return result, ErrCloudInstance
|
||||||
}
|
}
|
||||||
matches, err := instances.List(s.matchRE)
|
matches, err := instances.List(nc.matchRE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
@ -478,7 +533,7 @@ func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if resources == nil {
|
if resources == nil {
|
||||||
resources = s.staticResources
|
resources = nc.staticResources
|
||||||
}
|
}
|
||||||
if resources != nil {
|
if resources != nil {
|
||||||
node.Spec.Capacity = resources.Capacity
|
node.Spec.Capacity = resources.Capacity
|
||||||
@ -494,13 +549,34 @@ func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) {
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// deletePods will delete all pods from master running on given node.
|
||||||
|
func (nc *NodeController) deletePods(nodeID string) error {
|
||||||
|
glog.V(2).Infof("Delete all pods from %v", nodeID)
|
||||||
|
// TODO: We don't yet have field selectors from client, see issue #1362.
|
||||||
|
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
if pod.Status.Host != nodeID {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
glog.V(2).Infof("Delete pod %v", pod.Name)
|
||||||
|
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name); err != nil {
|
||||||
|
glog.Errorf("Error deleting pod %v: %v", pod.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// isRunningCloudProvider checks if cluster is running with cloud provider.
|
// isRunningCloudProvider checks if cluster is running with cloud provider.
|
||||||
func (s *NodeController) isRunningCloudProvider() bool {
|
func (nc *NodeController) isRunningCloudProvider() bool {
|
||||||
return s.cloud != nil && len(s.matchRE) > 0
|
return nc.cloud != nil && len(nc.matchRE) > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// canonicalizeName takes a node list and lowercases all nodes' name.
|
// canonicalizeName takes a node list and lowercases all nodes' name.
|
||||||
func (s *NodeController) canonicalizeName(nodes *api.NodeList) *api.NodeList {
|
func (nc *NodeController) canonicalizeName(nodes *api.NodeList) *api.NodeList {
|
||||||
for i := range nodes.Items {
|
for i := range nodes.Items {
|
||||||
nodes.Items[i].Name = strings.ToLower(nodes.Items[i].Name)
|
nodes.Items[i].Name = strings.ToLower(nodes.Items[i].Name)
|
||||||
}
|
}
|
||||||
@ -509,7 +585,7 @@ func (s *NodeController) canonicalizeName(nodes *api.NodeList) *api.NodeList {
|
|||||||
|
|
||||||
// getCondition returns a condition object for the specific condition
|
// getCondition returns a condition object for the specific condition
|
||||||
// type, nil if the condition is not set.
|
// type, nil if the condition is not set.
|
||||||
func (s *NodeController) getCondition(node *api.Node, conditionType api.NodeConditionType) *api.NodeCondition {
|
func (nc *NodeController) getCondition(node *api.Node, conditionType api.NodeConditionType) *api.NodeCondition {
|
||||||
for i := range node.Status.Conditions {
|
for i := range node.Status.Conditions {
|
||||||
if node.Status.Conditions[i].Type == conditionType {
|
if node.Status.Conditions[i].Type == conditionType {
|
||||||
return &node.Status.Conditions[i]
|
return &node.Status.Conditions[i]
|
||||||
|
@ -39,7 +39,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It
|
// FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It
|
||||||
// allows test cases to have fine-grained control over mock behaviors. We alos need
|
// allows test cases to have fine-grained control over mock behaviors. We also need
|
||||||
// PodsInterface and PodInterface to test list & delet pods, which is implemented in
|
// PodsInterface and PodInterface to test list & delet pods, which is implemented in
|
||||||
// the embeded client.Fake field.
|
// the embeded client.Fake field.
|
||||||
type FakeNodeHandler struct {
|
type FakeNodeHandler struct {
|
||||||
@ -377,7 +377,7 @@ func TestCreateGetCloudNodesWithSpec(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSyncCloud(t *testing.T) {
|
func TestSyncCloudNodes(t *testing.T) {
|
||||||
table := []struct {
|
table := []struct {
|
||||||
fakeNodeHandler *FakeNodeHandler
|
fakeNodeHandler *FakeNodeHandler
|
||||||
fakeCloud *fake_cloud.FakeCloud
|
fakeCloud *fake_cloud.FakeCloud
|
||||||
@ -464,7 +464,7 @@ func TestSyncCloud(t *testing.T) {
|
|||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute)
|
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute)
|
||||||
if err := nodeController.SyncCloud(); err != nil {
|
if err := nodeController.SyncCloudNodes(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
|
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
|
||||||
@ -485,7 +485,7 @@ func TestSyncCloud(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSyncCloudDeletePods(t *testing.T) {
|
func TestSyncCloudNodesEvictPods(t *testing.T) {
|
||||||
table := []struct {
|
table := []struct {
|
||||||
fakeNodeHandler *FakeNodeHandler
|
fakeNodeHandler *FakeNodeHandler
|
||||||
fakeCloud *fake_cloud.FakeCloud
|
fakeCloud *fake_cloud.FakeCloud
|
||||||
@ -546,7 +546,7 @@ func TestSyncCloudDeletePods(t *testing.T) {
|
|||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute)
|
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute)
|
||||||
if err := nodeController.SyncCloud(); err != nil {
|
if err := nodeController.SyncCloudNodes(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
|
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
|
||||||
@ -563,6 +563,7 @@ func TestSyncCloudDeletePods(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestNodeConditionsCheck(t *testing.T) {
|
func TestNodeConditionsCheck(t *testing.T) {
|
||||||
|
fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
|
||||||
table := []struct {
|
table := []struct {
|
||||||
node *api.Node
|
node *api.Node
|
||||||
fakeKubeletClient *FakeKubeletClient
|
fakeKubeletClient *FakeKubeletClient
|
||||||
@ -578,14 +579,18 @@ func TestNodeConditionsCheck(t *testing.T) {
|
|||||||
},
|
},
|
||||||
expectedConditions: []api.NodeCondition{
|
expectedConditions: []api.NodeCondition{
|
||||||
{
|
{
|
||||||
Type: api.NodeReady,
|
Type: api.NodeReady,
|
||||||
Status: api.ConditionFull,
|
Status: api.ConditionFull,
|
||||||
Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok",
|
Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok",
|
||||||
|
LastProbeTime: fakeNow,
|
||||||
|
LastTransitionTime: fakeNow,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Type: api.NodeSchedulable,
|
Type: api.NodeSchedulable,
|
||||||
Status: api.ConditionFull,
|
Status: api.ConditionFull,
|
||||||
Reason: "Node is schedulable by default",
|
Reason: "Node is schedulable by default",
|
||||||
|
LastProbeTime: fakeNow,
|
||||||
|
LastTransitionTime: fakeNow,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -599,14 +604,18 @@ func TestNodeConditionsCheck(t *testing.T) {
|
|||||||
},
|
},
|
||||||
expectedConditions: []api.NodeCondition{
|
expectedConditions: []api.NodeCondition{
|
||||||
{
|
{
|
||||||
Type: api.NodeReady,
|
Type: api.NodeReady,
|
||||||
Status: api.ConditionNone,
|
Status: api.ConditionNone,
|
||||||
Reason: "Node health check failed: kubelet /healthz endpoint returns not ok",
|
Reason: "Node health check failed: kubelet /healthz endpoint returns not ok",
|
||||||
|
LastProbeTime: fakeNow,
|
||||||
|
LastTransitionTime: fakeNow,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Type: api.NodeSchedulable,
|
Type: api.NodeSchedulable,
|
||||||
Status: api.ConditionFull,
|
Status: api.ConditionFull,
|
||||||
Reason: "Node is schedulable by default",
|
Reason: "Node is schedulable by default",
|
||||||
|
LastProbeTime: fakeNow,
|
||||||
|
LastTransitionTime: fakeNow,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -620,14 +629,18 @@ func TestNodeConditionsCheck(t *testing.T) {
|
|||||||
},
|
},
|
||||||
expectedConditions: []api.NodeCondition{
|
expectedConditions: []api.NodeCondition{
|
||||||
{
|
{
|
||||||
Type: api.NodeReady,
|
Type: api.NodeReady,
|
||||||
Status: api.ConditionUnknown,
|
Status: api.ConditionUnknown,
|
||||||
Reason: "Node health check error: Error",
|
Reason: "Node health check error: Error",
|
||||||
|
LastProbeTime: fakeNow,
|
||||||
|
LastTransitionTime: fakeNow,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Type: api.NodeSchedulable,
|
Type: api.NodeSchedulable,
|
||||||
Status: api.ConditionNone,
|
Status: api.ConditionNone,
|
||||||
Reason: "User marked unschedulable during node create/update",
|
Reason: "User marked unschedulable during node create/update",
|
||||||
|
LastProbeTime: fakeNow,
|
||||||
|
LastTransitionTime: fakeNow,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -635,17 +648,8 @@ func TestNodeConditionsCheck(t *testing.T) {
|
|||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute)
|
nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute)
|
||||||
|
nodeController.now = func() util.Time { return fakeNow }
|
||||||
conditions := nodeController.DoCheck(item.node)
|
conditions := nodeController.DoCheck(item.node)
|
||||||
for i := range conditions {
|
|
||||||
if conditions[i].LastTransitionTime.IsZero() {
|
|
||||||
t.Errorf("unexpected zero last transition timestamp")
|
|
||||||
}
|
|
||||||
if conditions[i].LastProbeTime.IsZero() {
|
|
||||||
t.Errorf("unexpected zero last probe timestamp")
|
|
||||||
}
|
|
||||||
conditions[i].LastTransitionTime = util.Time{}
|
|
||||||
conditions[i].LastProbeTime = util.Time{}
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(item.expectedConditions, conditions) {
|
if !reflect.DeepEqual(item.expectedConditions, conditions) {
|
||||||
t.Errorf("expected conditions %+v, got %+v", item.expectedConditions, conditions)
|
t.Errorf("expected conditions %+v, got %+v", item.expectedConditions, conditions)
|
||||||
}
|
}
|
||||||
@ -688,16 +692,115 @@ func TestPopulateNodeAddresses(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSyncNodeStatusTransitionTime(t *testing.T) {
|
func TestSyncProbedNodeStatus(t *testing.T) {
|
||||||
|
fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
|
||||||
table := []struct {
|
table := []struct {
|
||||||
fakeNodeHandler *FakeNodeHandler
|
fakeNodeHandler *FakeNodeHandler
|
||||||
fakeKubeletClient *FakeKubeletClient
|
fakeKubeletClient *FakeKubeletClient
|
||||||
expectedRequestCount int
|
fakeCloud *fake_cloud.FakeCloud
|
||||||
expectedTransitionTimeChange bool
|
expectedNodes []*api.Node
|
||||||
|
expectedRequestCount int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
|
Existing: []*api.Node{newNode("node0"), newNode("node1")},
|
||||||
|
},
|
||||||
|
fakeKubeletClient: &FakeKubeletClient{
|
||||||
|
Status: probe.Success,
|
||||||
|
Err: nil,
|
||||||
|
},
|
||||||
|
fakeCloud: &fake_cloud.FakeCloud{
|
||||||
|
Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}},
|
||||||
|
},
|
||||||
|
expectedNodes: []*api.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{Name: "node0"},
|
||||||
|
Status: api.NodeStatus{
|
||||||
|
Conditions: []api.NodeCondition{
|
||||||
|
{
|
||||||
|
Type: api.NodeReady,
|
||||||
|
Status: api.ConditionFull,
|
||||||
|
Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok",
|
||||||
|
LastProbeTime: fakeNow,
|
||||||
|
LastTransitionTime: fakeNow,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: api.NodeSchedulable,
|
||||||
|
Status: api.ConditionFull,
|
||||||
|
Reason: "Node is schedulable by default",
|
||||||
|
LastProbeTime: fakeNow,
|
||||||
|
LastTransitionTime: fakeNow,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Addresses: []api.NodeAddress{
|
||||||
|
{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{Name: "node1"},
|
||||||
|
Status: api.NodeStatus{
|
||||||
|
Conditions: []api.NodeCondition{
|
||||||
|
{
|
||||||
|
Type: api.NodeReady,
|
||||||
|
Status: api.ConditionFull,
|
||||||
|
Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok",
|
||||||
|
LastProbeTime: fakeNow,
|
||||||
|
LastTransitionTime: fakeNow,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: api.NodeSchedulable,
|
||||||
|
Status: api.ConditionFull,
|
||||||
|
Reason: "Node is schedulable by default",
|
||||||
|
LastProbeTime: fakeNow,
|
||||||
|
LastTransitionTime: fakeNow,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Addresses: []api.NodeAddress{
|
||||||
|
{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedRequestCount: 3, // List + 2xUpdate
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, item := range table {
|
||||||
|
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute)
|
||||||
|
nodeController.now = func() util.Time { return fakeNow }
|
||||||
|
if err := nodeController.SyncProbedNodeStatus(); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
|
||||||
|
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) {
|
||||||
|
t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0])
|
||||||
|
}
|
||||||
|
// Second sync will also update the node.
|
||||||
|
item.fakeNodeHandler.RequestCount = 0
|
||||||
|
if err := nodeController.SyncProbedNodeStatus(); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
|
||||||
|
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyncProbedNodeStatusTransitionTime(t *testing.T) {
|
||||||
|
fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
|
||||||
|
table := []struct {
|
||||||
|
fakeNodeHandler *FakeNodeHandler
|
||||||
|
fakeKubeletClient *FakeKubeletClient
|
||||||
|
expectedRequestCount int
|
||||||
|
expectedTransitionTime util.Time
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
// Existing node is healthy, current probe is healthy too.
|
// Existing node is healthy, current probe is healthy too.
|
||||||
// Existing node is schedulable, again explicitly mark node as schedulable.
|
// Existing node is schedulable, again explicitly mark node as schedulable.
|
||||||
|
// Expect transition time to stay the same as before.
|
||||||
fakeNodeHandler: &FakeNodeHandler{
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
Existing: []*api.Node{
|
Existing: []*api.Node{
|
||||||
{
|
{
|
||||||
@ -726,12 +829,13 @@ func TestSyncNodeStatusTransitionTime(t *testing.T) {
|
|||||||
Status: probe.Success,
|
Status: probe.Success,
|
||||||
Err: nil,
|
Err: nil,
|
||||||
},
|
},
|
||||||
expectedRequestCount: 2, // List+Update
|
expectedRequestCount: 2, // List+Update
|
||||||
expectedTransitionTimeChange: false,
|
expectedTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
// Existing node is healthy, current probe is unhealthy.
|
// Existing node is healthy, current probe is unhealthy.
|
||||||
// Existing node is schedulable, mark node as unschedulable.
|
// Existing node is schedulable, mark node as unschedulable.
|
||||||
|
// Expect transition time to be now.
|
||||||
fakeNodeHandler: &FakeNodeHandler{
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
Existing: []*api.Node{
|
Existing: []*api.Node{
|
||||||
{
|
{
|
||||||
@ -760,153 +864,32 @@ func TestSyncNodeStatusTransitionTime(t *testing.T) {
|
|||||||
Status: probe.Failure,
|
Status: probe.Failure,
|
||||||
Err: nil,
|
Err: nil,
|
||||||
},
|
},
|
||||||
expectedRequestCount: 2, // List+Update
|
expectedRequestCount: 2, // List+Update
|
||||||
expectedTransitionTimeChange: true,
|
expectedTransitionTime: fakeNow,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute)
|
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute)
|
||||||
nodeController.lookupIP = func(host string) ([]net.IP, error) {
|
nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) }
|
||||||
return nil, fmt.Errorf("lookup %v: no such host", host)
|
nodeController.now = func() util.Time { return fakeNow }
|
||||||
}
|
if err := nodeController.SyncProbedNodeStatus(); err != nil {
|
||||||
if err := nodeController.SyncNodeStatus(); err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
if item.expectedRequestCount != item.fakeNodeHandler.RequestCount {
|
if item.expectedRequestCount != item.fakeNodeHandler.RequestCount {
|
||||||
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
|
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
|
||||||
}
|
}
|
||||||
for i := range item.fakeNodeHandler.UpdatedNodes {
|
for _, node := range item.fakeNodeHandler.UpdatedNodes {
|
||||||
conditions := item.fakeNodeHandler.UpdatedNodes[i].Status.Conditions
|
for _, condition := range node.Status.Conditions {
|
||||||
for j := range conditions {
|
if !condition.LastTransitionTime.Time.Equal(item.expectedTransitionTime.Time) {
|
||||||
condition := conditions[j]
|
t.Errorf("expected last transition time %v, but got %v", item.expectedTransitionTime, condition.LastTransitionTime)
|
||||||
if item.expectedTransitionTimeChange {
|
|
||||||
if !condition.LastTransitionTime.After(time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) {
|
|
||||||
t.Errorf("unexpected last transition timestamp %v", condition.LastTransitionTime)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if !condition.LastTransitionTime.Equal(time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) {
|
|
||||||
t.Errorf("unexpected last transition timestamp %v", condition.LastTransitionTime)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEvictTimeoutedPods(t *testing.T) {
|
func TestSyncProbedNodeStatusEvictPods(t *testing.T) {
|
||||||
table := []struct {
|
|
||||||
fakeNodeHandler *FakeNodeHandler
|
|
||||||
expectedRequestCount int
|
|
||||||
expectedActions []client.FakeAction
|
|
||||||
}{
|
|
||||||
// Node created long time ago, with no status.
|
|
||||||
{
|
|
||||||
fakeNodeHandler: &FakeNodeHandler{
|
|
||||||
Existing: []*api.Node{
|
|
||||||
{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "node0",
|
|
||||||
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Fake: client.Fake{
|
|
||||||
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expectedRequestCount: 1, // List
|
|
||||||
expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}},
|
|
||||||
},
|
|
||||||
// Node created recently, with no status.
|
|
||||||
{
|
|
||||||
fakeNodeHandler: &FakeNodeHandler{
|
|
||||||
Existing: []*api.Node{
|
|
||||||
{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "node0",
|
|
||||||
CreationTimestamp: util.Now(),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Fake: client.Fake{
|
|
||||||
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expectedRequestCount: 1, // List
|
|
||||||
expectedActions: nil,
|
|
||||||
},
|
|
||||||
// Node created long time ago, with status updated long time ago.
|
|
||||||
{
|
|
||||||
fakeNodeHandler: &FakeNodeHandler{
|
|
||||||
Existing: []*api.Node{
|
|
||||||
{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "node0",
|
|
||||||
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
|
||||||
},
|
|
||||||
Status: api.NodeStatus{
|
|
||||||
Conditions: []api.NodeCondition{
|
|
||||||
{
|
|
||||||
Type: api.NodeReady,
|
|
||||||
Status: api.ConditionFull,
|
|
||||||
LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Fake: client.Fake{
|
|
||||||
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expectedRequestCount: 1, // List
|
|
||||||
expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}},
|
|
||||||
},
|
|
||||||
// Node created long time ago, with status updated recently.
|
|
||||||
{
|
|
||||||
fakeNodeHandler: &FakeNodeHandler{
|
|
||||||
Existing: []*api.Node{
|
|
||||||
{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "node0",
|
|
||||||
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
|
||||||
},
|
|
||||||
Status: api.NodeStatus{
|
|
||||||
Conditions: []api.NodeCondition{
|
|
||||||
{
|
|
||||||
Type: api.NodeReady,
|
|
||||||
Status: api.ConditionFull,
|
|
||||||
LastProbeTime: util.Now(),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Fake: client.Fake{
|
|
||||||
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expectedRequestCount: 1, // List
|
|
||||||
expectedActions: nil,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, item := range table {
|
|
||||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute)
|
|
||||||
if err := nodeController.EvictTimeoutedPods(); err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
if item.expectedRequestCount != item.fakeNodeHandler.RequestCount {
|
|
||||||
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(item.expectedActions, item.fakeNodeHandler.Actions) {
|
|
||||||
t.Errorf("actions differs, expected %+v, got %+v", item.expectedActions, item.fakeNodeHandler.Actions)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSyncNodeStatusDeletePods(t *testing.T) {
|
|
||||||
table := []struct {
|
table := []struct {
|
||||||
fakeNodeHandler *FakeNodeHandler
|
fakeNodeHandler *FakeNodeHandler
|
||||||
fakeKubeletClient *FakeKubeletClient
|
fakeKubeletClient *FakeKubeletClient
|
||||||
@ -1041,10 +1024,8 @@ func TestSyncNodeStatusDeletePods(t *testing.T) {
|
|||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute)
|
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute)
|
||||||
nodeController.lookupIP = func(host string) ([]net.IP, error) {
|
nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) }
|
||||||
return nil, fmt.Errorf("lookup %v: no such host", host)
|
if err := nodeController.SyncProbedNodeStatus(); err != nil {
|
||||||
}
|
|
||||||
if err := nodeController.SyncNodeStatus(); err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
if item.expectedRequestCount != item.fakeNodeHandler.RequestCount {
|
if item.expectedRequestCount != item.fakeNodeHandler.RequestCount {
|
||||||
@ -1056,103 +1037,322 @@ func TestSyncNodeStatusDeletePods(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSyncNodeStatus(t *testing.T) {
|
func TestMonitorNodeStatusEvictPods(t *testing.T) {
|
||||||
|
fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
|
||||||
table := []struct {
|
table := []struct {
|
||||||
fakeNodeHandler *FakeNodeHandler
|
fakeNodeHandler *FakeNodeHandler
|
||||||
fakeKubeletClient *FakeKubeletClient
|
expectedEvictPods bool
|
||||||
fakeCloud *fake_cloud.FakeCloud
|
evictionTimeout time.Duration
|
||||||
expectedNodes []*api.Node
|
|
||||||
expectedRequestCount int
|
|
||||||
}{
|
}{
|
||||||
|
// Node created recently, with no status (happens only at cluster startup).
|
||||||
{
|
{
|
||||||
fakeNodeHandler: &FakeNodeHandler{
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
Existing: []*api.Node{newNode("node0"), newNode("node1")},
|
Existing: []*api.Node{
|
||||||
},
|
{
|
||||||
fakeKubeletClient: &FakeKubeletClient{
|
ObjectMeta: api.ObjectMeta{
|
||||||
Status: probe.Success,
|
Name: "node0",
|
||||||
Err: nil,
|
CreationTimestamp: fakeNow,
|
||||||
},
|
|
||||||
fakeCloud: &fake_cloud.FakeCloud{
|
|
||||||
Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}},
|
|
||||||
},
|
|
||||||
expectedNodes: []*api.Node{
|
|
||||||
{
|
|
||||||
ObjectMeta: api.ObjectMeta{Name: "node0"},
|
|
||||||
Status: api.NodeStatus{
|
|
||||||
Conditions: []api.NodeCondition{
|
|
||||||
{
|
|
||||||
Type: api.NodeReady,
|
|
||||||
Status: api.ConditionFull,
|
|
||||||
Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Type: api.NodeSchedulable,
|
|
||||||
Status: api.ConditionFull,
|
|
||||||
Reason: "Node is schedulable by default",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Addresses: []api.NodeAddress{
|
|
||||||
{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
Fake: client.Fake{
|
||||||
ObjectMeta: api.ObjectMeta{Name: "node1"},
|
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
|
||||||
Status: api.NodeStatus{
|
},
|
||||||
Conditions: []api.NodeCondition{
|
},
|
||||||
{
|
evictionTimeout: 30 * time.Minute,
|
||||||
Type: api.NodeReady,
|
expectedEvictPods: false,
|
||||||
Status: api.ConditionFull,
|
},
|
||||||
Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok",
|
// Node created long time ago, and kubelet posted NotReady for a short period of time.
|
||||||
},
|
{
|
||||||
{
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
Type: api.NodeSchedulable,
|
Existing: []*api.Node{
|
||||||
Status: api.ConditionFull,
|
{
|
||||||
Reason: "Node is schedulable by default",
|
ObjectMeta: api.ObjectMeta{
|
||||||
},
|
Name: "node0",
|
||||||
|
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||||
},
|
},
|
||||||
Addresses: []api.NodeAddress{
|
Status: api.NodeStatus{
|
||||||
{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"},
|
Conditions: []api.NodeCondition{
|
||||||
|
{
|
||||||
|
Type: api.NodeReady,
|
||||||
|
Status: api.ConditionNone,
|
||||||
|
// Node status has just been updated, and transited to NotReady for 10min.
|
||||||
|
LastProbeTime: util.Date(2015, 1, 1, 11, 59, 0, 0, time.UTC),
|
||||||
|
LastTransitionTime: util.Date(2015, 1, 1, 11, 50, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
Fake: client.Fake{
|
||||||
|
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
expectedRequestCount: 3, // List + 2xUpdate
|
evictionTimeout: 30 * time.Minute,
|
||||||
|
expectedEvictPods: false,
|
||||||
|
},
|
||||||
|
// Node created long time ago, and kubelet posted NotReady for a long period of time.
|
||||||
|
{
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
|
Existing: []*api.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "node0",
|
||||||
|
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
Status: api.NodeStatus{
|
||||||
|
Conditions: []api.NodeCondition{
|
||||||
|
{
|
||||||
|
Type: api.NodeReady,
|
||||||
|
Status: api.ConditionNone,
|
||||||
|
// Node status has just been updated, and transited to NotReady for 1hr.
|
||||||
|
LastProbeTime: util.Date(2015, 1, 1, 11, 59, 0, 0, time.UTC),
|
||||||
|
LastTransitionTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Fake: client.Fake{
|
||||||
|
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
evictionTimeout: 30 * time.Minute,
|
||||||
|
expectedEvictPods: true,
|
||||||
|
},
|
||||||
|
// Node created long time ago, node controller posted Unknown for a short period of time.
|
||||||
|
{
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
|
Existing: []*api.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "node0",
|
||||||
|
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
Status: api.NodeStatus{
|
||||||
|
Conditions: []api.NodeCondition{
|
||||||
|
{
|
||||||
|
Type: api.NodeReady,
|
||||||
|
Status: api.ConditionUnknown,
|
||||||
|
// Node status was updated by nodecontroller 10min ago
|
||||||
|
LastProbeTime: util.Date(2015, 1, 1, 11, 50, 0, 0, time.UTC),
|
||||||
|
LastTransitionTime: util.Date(2015, 1, 1, 11, 50, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Fake: client.Fake{
|
||||||
|
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
evictionTimeout: 30 * time.Minute,
|
||||||
|
expectedEvictPods: false,
|
||||||
|
},
|
||||||
|
// Node created long time ago, node controller posted Unknown for a long period of time.
|
||||||
|
{
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
|
Existing: []*api.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "node0",
|
||||||
|
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
Status: api.NodeStatus{
|
||||||
|
Conditions: []api.NodeCondition{
|
||||||
|
{
|
||||||
|
Type: api.NodeReady,
|
||||||
|
Status: api.ConditionUnknown,
|
||||||
|
// Node status was updated by nodecontroller 1hr ago
|
||||||
|
LastProbeTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC),
|
||||||
|
LastTransitionTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Fake: client.Fake{
|
||||||
|
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
evictionTimeout: 30 * time.Minute,
|
||||||
|
expectedEvictPods: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute)
|
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, item.evictionTimeout)
|
||||||
if err := nodeController.SyncNodeStatus(); err != nil {
|
nodeController.now = func() util.Time { return fakeNow }
|
||||||
|
if err := nodeController.MonitorNodeStatus(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
|
podEvicted := false
|
||||||
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
|
for _, action := range item.fakeNodeHandler.Actions {
|
||||||
}
|
if action.Action == "delete-pod" {
|
||||||
for i := range item.fakeNodeHandler.UpdatedNodes {
|
podEvicted = true
|
||||||
conditions := item.fakeNodeHandler.UpdatedNodes[i].Status.Conditions
|
|
||||||
for j := range conditions {
|
|
||||||
if conditions[j].LastTransitionTime.IsZero() {
|
|
||||||
t.Errorf("unexpected zero last transition timestamp")
|
|
||||||
}
|
|
||||||
if conditions[j].LastProbeTime.IsZero() {
|
|
||||||
t.Errorf("unexpected zero last probe timestamp")
|
|
||||||
}
|
|
||||||
conditions[j].LastTransitionTime = util.Time{}
|
|
||||||
conditions[j].LastProbeTime = util.Time{}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if item.expectedEvictPods != podEvicted {
|
||||||
|
t.Errorf("expected pod eviction: %+v, got %+v", item.expectedEvictPods, podEvicted)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
|
||||||
|
fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
|
||||||
|
table := []struct {
|
||||||
|
fakeNodeHandler *FakeNodeHandler
|
||||||
|
expectedRequestCount int
|
||||||
|
expectedNodes []*api.Node
|
||||||
|
}{
|
||||||
|
// Node created long time ago, without status:
|
||||||
|
// Expect Unknown status posted from node controller.
|
||||||
|
{
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
|
Existing: []*api.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "node0",
|
||||||
|
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Fake: client.Fake{
|
||||||
|
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedRequestCount: 2, // List+Update
|
||||||
|
expectedNodes: []*api.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "node0",
|
||||||
|
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
Status: api.NodeStatus{
|
||||||
|
Conditions: []api.NodeCondition{
|
||||||
|
{
|
||||||
|
Type: api.NodeReady,
|
||||||
|
Status: api.ConditionUnknown,
|
||||||
|
Reason: fmt.Sprintf("Kubelet never posted node status"),
|
||||||
|
LastProbeTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||||
|
LastTransitionTime: fakeNow,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
// Node created recently, without status.
|
||||||
|
// Expect no action from node controller (within startup grace period).
|
||||||
|
{
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
|
Existing: []*api.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "node0",
|
||||||
|
CreationTimestamp: fakeNow,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Fake: client.Fake{
|
||||||
|
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedRequestCount: 1, // List
|
||||||
|
expectedNodes: nil,
|
||||||
|
},
|
||||||
|
// Node created long time ago, with status updated by kubelet exceeds grace period.
|
||||||
|
// Expect Unknown status posted from node controller.
|
||||||
|
{
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
|
Existing: []*api.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "node0",
|
||||||
|
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
Status: api.NodeStatus{
|
||||||
|
Conditions: []api.NodeCondition{
|
||||||
|
{
|
||||||
|
Type: api.NodeReady,
|
||||||
|
Status: api.ConditionFull,
|
||||||
|
// Node status hasn't been updated for 1hr.
|
||||||
|
LastProbeTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC),
|
||||||
|
LastTransitionTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Fake: client.Fake{
|
||||||
|
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedRequestCount: 2, // List+Update
|
||||||
|
expectedNodes: []*api.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "node0",
|
||||||
|
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
Status: api.NodeStatus{
|
||||||
|
Conditions: []api.NodeCondition{
|
||||||
|
{
|
||||||
|
Type: api.NodeReady,
|
||||||
|
Status: api.ConditionUnknown,
|
||||||
|
Reason: fmt.Sprintf("Kubelet stopped posting node status"),
|
||||||
|
LastProbeTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC),
|
||||||
|
LastTransitionTime: fakeNow,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
// Node created long time ago, with status updated recently.
|
||||||
|
// Expect no action from node controller (within monitor grace period).
|
||||||
|
{
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
|
Existing: []*api.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "node0",
|
||||||
|
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
Status: api.NodeStatus{
|
||||||
|
Conditions: []api.NodeCondition{
|
||||||
|
{
|
||||||
|
Type: api.NodeReady,
|
||||||
|
Status: api.ConditionFull,
|
||||||
|
// Node status has just been updated.
|
||||||
|
LastProbeTime: fakeNow,
|
||||||
|
LastTransitionTime: fakeNow,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Fake: client.Fake{
|
||||||
|
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedRequestCount: 1, // List
|
||||||
|
expectedNodes: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, item := range table {
|
||||||
|
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute)
|
||||||
|
nodeController.now = func() util.Time { return fakeNow }
|
||||||
|
if err := nodeController.MonitorNodeStatus(); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if item.expectedRequestCount != item.fakeNodeHandler.RequestCount {
|
||||||
|
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
|
||||||
|
}
|
||||||
if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) {
|
if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) {
|
||||||
t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0])
|
t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0])
|
||||||
}
|
}
|
||||||
// Second sync will also update the node.
|
|
||||||
item.fakeNodeHandler.RequestCount = 0
|
|
||||||
if err := nodeController.SyncNodeStatus(); err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
|
|
||||||
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,7 +75,19 @@ const (
|
|||||||
initialNodeStatusUpdateFrequency = 100 * time.Millisecond
|
initialNodeStatusUpdateFrequency = 100 * time.Millisecond
|
||||||
nodeStatusUpdateFrequencyInc = 500 * time.Millisecond
|
nodeStatusUpdateFrequencyInc = 500 * time.Millisecond
|
||||||
|
|
||||||
// The retry count for updating node status at each sync period.
|
// nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
|
||||||
|
// Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
|
||||||
|
// in nodecontroller. There are several constraints:
|
||||||
|
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
|
||||||
|
// N means number of retries allowed for kubelet to post node status. It is pointless
|
||||||
|
// to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
|
||||||
|
// will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
|
||||||
|
// The constant must be less than podEvictionTimeout.
|
||||||
|
// 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
|
||||||
|
// status. Kubelet may fail to update node status reliablly if the value is too small,
|
||||||
|
// as it takes time to gather all necessary node information.
|
||||||
|
nodeStatusUpdateFrequency = 2 * time.Second
|
||||||
|
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
|
||||||
nodeStatusUpdateRetry = 5
|
nodeStatusUpdateRetry = 5
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -124,7 +136,6 @@ func NewMainKubelet(
|
|||||||
streamingConnectionIdleTimeout time.Duration,
|
streamingConnectionIdleTimeout time.Duration,
|
||||||
recorder record.EventRecorder,
|
recorder record.EventRecorder,
|
||||||
cadvisorInterface cadvisor.Interface,
|
cadvisorInterface cadvisor.Interface,
|
||||||
statusUpdateFrequency time.Duration,
|
|
||||||
imageGCPolicy ImageGCPolicy) (*Kubelet, error) {
|
imageGCPolicy ImageGCPolicy) (*Kubelet, error) {
|
||||||
if rootDirectory == "" {
|
if rootDirectory == "" {
|
||||||
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
|
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
|
||||||
@ -202,7 +213,6 @@ func NewMainKubelet(
|
|||||||
dockerClient: dockerClient,
|
dockerClient: dockerClient,
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
rootDirectory: rootDirectory,
|
rootDirectory: rootDirectory,
|
||||||
statusUpdateFrequency: statusUpdateFrequency,
|
|
||||||
resyncInterval: resyncInterval,
|
resyncInterval: resyncInterval,
|
||||||
podInfraContainerImage: podInfraContainerImage,
|
podInfraContainerImage: podInfraContainerImage,
|
||||||
containerIDToRef: map[string]*api.ObjectReference{},
|
containerIDToRef: map[string]*api.ObjectReference{},
|
||||||
@ -275,7 +285,6 @@ type Kubelet struct {
|
|||||||
rootDirectory string
|
rootDirectory string
|
||||||
podInfraContainerImage string
|
podInfraContainerImage string
|
||||||
podWorkers *podWorkers
|
podWorkers *podWorkers
|
||||||
statusUpdateFrequency time.Duration
|
|
||||||
resyncInterval time.Duration
|
resyncInterval time.Duration
|
||||||
sourcesReady SourcesReadyFn
|
sourcesReady SourcesReadyFn
|
||||||
|
|
||||||
@ -532,7 +541,8 @@ func (kl *Kubelet) syncNodeStatus() {
|
|||||||
if kl.kubeClient == nil {
|
if kl.kubeClient == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for feq := initialNodeStatusUpdateFrequency; feq < kl.statusUpdateFrequency; feq += nodeStatusUpdateFrequencyInc {
|
|
||||||
|
for feq := initialNodeStatusUpdateFrequency; feq < nodeStatusUpdateFrequency; feq += nodeStatusUpdateFrequencyInc {
|
||||||
select {
|
select {
|
||||||
case <-time.After(feq):
|
case <-time.After(feq):
|
||||||
if err := kl.updateNodeStatus(); err != nil {
|
if err := kl.updateNodeStatus(); err != nil {
|
||||||
@ -542,7 +552,7 @@ func (kl *Kubelet) syncNodeStatus() {
|
|||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-time.After(kl.statusUpdateFrequency):
|
case <-time.After(nodeStatusUpdateFrequency):
|
||||||
if err := kl.updateNodeStatus(); err != nil {
|
if err := kl.updateNodeStatus(); err != nil {
|
||||||
glog.Errorf("Unable to update node status: %v", err)
|
glog.Errorf("Unable to update node status: %v", err)
|
||||||
}
|
}
|
||||||
@ -1837,20 +1847,23 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
|
|||||||
node.Spec.Capacity = CapacityFromMachineInfo(info)
|
node.Spec.Capacity = CapacityFromMachineInfo(info)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
currentTime := util.Now()
|
||||||
newCondition := api.NodeCondition{
|
newCondition := api.NodeCondition{
|
||||||
Type: api.NodeReady,
|
Type: api.NodeReady,
|
||||||
Status: api.ConditionFull,
|
Status: api.ConditionFull,
|
||||||
Reason: fmt.Sprintf("kubelet is posting ready status"),
|
Reason: fmt.Sprintf("kubelet is posting ready status"),
|
||||||
LastProbeTime: util.Now(),
|
LastProbeTime: currentTime,
|
||||||
}
|
}
|
||||||
updated := false
|
updated := false
|
||||||
for i := range node.Status.Conditions {
|
for i := range node.Status.Conditions {
|
||||||
if node.Status.Conditions[i].Type == api.NodeReady {
|
if node.Status.Conditions[i].Type == api.NodeReady {
|
||||||
|
newCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime
|
||||||
node.Status.Conditions[i] = newCondition
|
node.Status.Conditions[i] = newCondition
|
||||||
updated = true
|
updated = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !updated {
|
if !updated {
|
||||||
|
newCondition.LastTransitionTime = currentTime
|
||||||
node.Status.Conditions = append(node.Status.Conditions, newCondition)
|
node.Status.Conditions = append(node.Status.Conditions, newCondition)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3102,10 +3102,11 @@ func TestUpdateNewNodeStatus(t *testing.T) {
|
|||||||
Status: api.NodeStatus{
|
Status: api.NodeStatus{
|
||||||
Conditions: []api.NodeCondition{
|
Conditions: []api.NodeCondition{
|
||||||
{
|
{
|
||||||
Type: api.NodeReady,
|
Type: api.NodeReady,
|
||||||
Status: api.ConditionFull,
|
Status: api.ConditionFull,
|
||||||
Reason: fmt.Sprintf("kubelet is posting ready status"),
|
Reason: fmt.Sprintf("kubelet is posting ready status"),
|
||||||
LastProbeTime: util.Time{},
|
LastProbeTime: util.Time{},
|
||||||
|
LastTransitionTime: util.Time{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
NodeInfo: api.NodeSystemInfo{
|
NodeInfo: api.NodeSystemInfo{
|
||||||
@ -3128,7 +3129,11 @@ func TestUpdateNewNodeStatus(t *testing.T) {
|
|||||||
if updatedNode.Status.Conditions[0].LastProbeTime.IsZero() {
|
if updatedNode.Status.Conditions[0].LastProbeTime.IsZero() {
|
||||||
t.Errorf("unexpected zero last probe timestamp")
|
t.Errorf("unexpected zero last probe timestamp")
|
||||||
}
|
}
|
||||||
|
if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() {
|
||||||
|
t.Errorf("unexpected zero last transition timestamp")
|
||||||
|
}
|
||||||
updatedNode.Status.Conditions[0].LastProbeTime = util.Time{}
|
updatedNode.Status.Conditions[0].LastProbeTime = util.Time{}
|
||||||
|
updatedNode.Status.Conditions[0].LastTransitionTime = util.Time{}
|
||||||
if !reflect.DeepEqual(expectedNode, updatedNode) {
|
if !reflect.DeepEqual(expectedNode, updatedNode) {
|
||||||
t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode)
|
t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode)
|
||||||
}
|
}
|
||||||
@ -3151,10 +3156,11 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
|
|||||||
Status: api.NodeStatus{
|
Status: api.NodeStatus{
|
||||||
Conditions: []api.NodeCondition{
|
Conditions: []api.NodeCondition{
|
||||||
{
|
{
|
||||||
Type: api.NodeReady,
|
Type: api.NodeReady,
|
||||||
Status: api.ConditionFull,
|
Status: api.ConditionFull,
|
||||||
Reason: fmt.Sprintf("kubelet is posting ready status"),
|
Reason: fmt.Sprintf("kubelet is posting ready status"),
|
||||||
LastProbeTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
LastProbeTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||||
|
LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -3173,10 +3179,11 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
|
|||||||
Status: api.NodeStatus{
|
Status: api.NodeStatus{
|
||||||
Conditions: []api.NodeCondition{
|
Conditions: []api.NodeCondition{
|
||||||
{
|
{
|
||||||
Type: api.NodeReady,
|
Type: api.NodeReady,
|
||||||
Status: api.ConditionFull,
|
Status: api.ConditionFull,
|
||||||
Reason: fmt.Sprintf("kubelet is posting ready status"),
|
Reason: fmt.Sprintf("kubelet is posting ready status"),
|
||||||
LastProbeTime: util.Time{}, // placeholder
|
LastProbeTime: util.Time{}, // placeholder
|
||||||
|
LastTransitionTime: util.Time{}, // placeholder
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
NodeInfo: api.NodeSystemInfo{
|
NodeInfo: api.NodeSystemInfo{
|
||||||
@ -3196,11 +3203,16 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
|
|||||||
if !ok {
|
if !ok {
|
||||||
t.Errorf("unexpected object type")
|
t.Errorf("unexpected object type")
|
||||||
}
|
}
|
||||||
|
// Expect LastProbeTime to be updated to Now, while LastTransitionTime to be the same.
|
||||||
if reflect.DeepEqual(updatedNode.Status.Conditions[0].LastProbeTime, util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) {
|
if reflect.DeepEqual(updatedNode.Status.Conditions[0].LastProbeTime, util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) {
|
||||||
t.Errorf("expected \n%v\n, got \n%v", updatedNode.Status.Conditions[0].LastProbeTime,
|
t.Errorf("expected \n%v\n, got \n%v", util.Now(), util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(updatedNode.Status.Conditions[0].LastTransitionTime, util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) {
|
||||||
|
t.Errorf("expected \n%v\n, got \n%v", updatedNode.Status.Conditions[0].LastTransitionTime,
|
||||||
util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC))
|
util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||||
}
|
}
|
||||||
updatedNode.Status.Conditions[0].LastProbeTime = util.Time{}
|
updatedNode.Status.Conditions[0].LastProbeTime = util.Time{}
|
||||||
|
updatedNode.Status.Conditions[0].LastTransitionTime = util.Time{}
|
||||||
if !reflect.DeepEqual(expectedNode, updatedNode) {
|
if !reflect.DeepEqual(expectedNode, updatedNode) {
|
||||||
t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode)
|
t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user