From c793c4f0ab946a5575dbbe3e1bbefc67e878a4eb Mon Sep 17 00:00:00 2001 From: Deyuan Deng Date: Fri, 16 Jan 2015 17:28:20 -0500 Subject: [PATCH] Sync node status from node controller to master. --- cmd/integration/integration.go | 4 +- cmd/kube-apiserver/apiserver.go | 1 - .../controller-manager.go | 18 +- pkg/api/validation/validation.go | 8 +- pkg/api/validation/validation_test.go | 14 - .../controller/nodecontroller.go | 229 +++++++-- .../controller/nodecontroller_test.go | 479 +++++++++++++----- pkg/master/master.go | 12 +- pkg/probe/http/http.go | 4 +- pkg/probe/tcp/tcp.go | 4 +- pkg/registry/minion/healthy_registry.go | 125 ----- pkg/registry/minion/healthy_registry_test.go | 114 ----- pkg/registry/minion/rest_test.go | 62 +-- pkg/standalone/standalone.go | 6 +- plugin/pkg/scheduler/factory/factory.go | 5 +- test/integration/auth_test.go | 4 +- 16 files changed, 584 insertions(+), 505 deletions(-) delete mode 100644 pkg/registry/minion/healthy_registry.go delete mode 100644 pkg/registry/minion/healthy_registry_test.go diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index d9a41e4ea4b..72b10f039a8 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -184,8 +184,8 @@ func startComponents(manifestURL string) (apiServerURL string) { controllerManager.Run(10 * time.Minute) nodeResources := &api.NodeResources{} - nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl) - nodeController.Run(10 * time.Second) + nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}) + nodeController.Run(10*time.Second, 10) // Kubelet (localhost) testRootDir := makeTempDirOrDie("kubelet_integ_1.") diff --git a/cmd/kube-apiserver/apiserver.go b/cmd/kube-apiserver/apiserver.go index d2fc7b9e887..dc19730163a 100644 --- a/cmd/kube-apiserver/apiserver.go +++ b/cmd/kube-apiserver/apiserver.go @@ -184,7 +184,6 @@ func main() { Client: client, Cloud: cloud, EtcdHelper: helper, - HealthCheckMinions: *healthCheckMinions, EventTTL: *eventTTL, KubeletClient: kubeletClient, PortalNet: &n, diff --git a/cmd/kube-controller-manager/controller-manager.go b/cmd/kube-controller-manager/controller-manager.go index 02d98ad0683..1eeb101b535 100644 --- a/cmd/kube-controller-manager/controller-manager.go +++ b/cmd/kube-controller-manager/controller-manager.go @@ -53,18 +53,22 @@ var ( nodeSyncPeriod = flag.Duration("node_sync_period", 10*time.Second, ""+ "The period for syncing nodes from cloudprovider. Longer periods will result in "+ "fewer calls to cloud provider, but may delay addition of new nodes to cluster.") + resourceQuotaSyncPeriod = flag.Duration("resource_quota_sync_period", 10*time.Second, "The period for syncing quota usage status in the system") + registerRetryCount = flag.Int("register_retry_count", 10, ""+ + "The number of retries for initial node registration. Retry interval equals node_sync_period.") machineList util.StringList // TODO: Discover these by pinging the host machines, and rip out these flags. // TODO: in the meantime, use resource.QuantityFlag() instead of these - nodeMilliCPU = flag.Int64("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node") - nodeMemory = resource.QuantityFlag("node_memory", "3Gi", "The amount of memory (in bytes) provisioned on each node") - resourceQuotaSyncPeriod = flag.Duration("resource_quota_sync_period", 10*time.Second, "The period for syncing quota usage status in the system") + nodeMilliCPU = flag.Int64("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node") + nodeMemory = resource.QuantityFlag("node_memory", "3Gi", "The amount of memory (in bytes) provisioned on each node") + kubeletConfig = client.KubeletConfig{Port: ports.KubeletPort, EnableHttps: false} ) func init() { flag.Var(&address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)") flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.") client.BindClientConfigFlags(flag.CommandLine, clientConfig) + client.BindKubeletClientConfigFlags(flag.CommandLine, &kubeletConfig) } func verifyMinionFlags() { @@ -104,6 +108,10 @@ func main() { controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient) controllerManager.Run(10 * time.Second) + kubeletClient, err := client.NewKubeletClient(&kubeletConfig) + if err != nil { + glog.Fatalf("Failure to start kubelet client: %v", err) + } cloud := cloudprovider.InitCloudProvider(*cloudProvider, *cloudConfigFile) nodeResources := &api.NodeResources{ Capacity: api.ResourceList{ @@ -111,8 +119,8 @@ func main() { api.ResourceMemory: *nodeMemory, }, } - nodeController := nodeControllerPkg.NewNodeController(cloud, *minionRegexp, machineList, nodeResources, kubeClient) - nodeController.Run(*nodeSyncPeriod) + nodeController := nodeControllerPkg.NewNodeController(cloud, *minionRegexp, machineList, nodeResources, kubeClient, kubeletClient) + nodeController.Run(*nodeSyncPeriod, *registerRetryCount) resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient) resourceQuotaManager.Run(*resourceQuotaSyncPeriod) diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index 24819a87587..2271f8e20ea 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -673,9 +673,11 @@ func ValidateMinionUpdate(oldMinion *api.Node, minion *api.Node) errs.Validation allErrs := errs.ValidationErrorList{} allErrs = append(allErrs, ValidateObjectMetaUpdate(&oldMinion.ObjectMeta, &minion.ObjectMeta).Prefix("metadata")...) - if !api.Semantic.DeepEqual(minion.Status, api.NodeStatus{}) { - allErrs = append(allErrs, errs.NewFieldInvalid("status", minion.Status, "status must be empty")) - } + // TODO: Enable the code once we have better api object.status update model. Currently, + // anyone can update node status. + // if !api.Semantic.DeepEqual(minion.Status, api.NodeStatus{}) { + // allErrs = append(allErrs, errs.NewFieldInvalid("status", minion.Status, "status must be empty")) + // } // TODO: move reset function to its own location // Ignore metadata changes now that they have been tested diff --git a/pkg/api/validation/validation_test.go b/pkg/api/validation/validation_test.go index 6943cd129a3..d91db387e92 100644 --- a/pkg/api/validation/validation_test.go +++ b/pkg/api/validation/validation_test.go @@ -1476,20 +1476,6 @@ func TestValidateMinionUpdate(t *testing.T) { }, }, }, true}, - {api.Node{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Labels: map[string]string{"bar": "foo"}, - }, - }, api.Node{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Labels: map[string]string{"bar": "fooobaz"}, - }, - Status: api.NodeStatus{ - HostIP: "1.2.3.4", - }, - }, false}, {api.Node{ ObjectMeta: api.ObjectMeta{ Name: "foo", diff --git a/pkg/cloudprovider/controller/nodecontroller.go b/pkg/cloudprovider/controller/nodecontroller.go index de2a619de81..25fb005ff9d 100644 --- a/pkg/cloudprovider/controller/nodecontroller.go +++ b/pkg/cloudprovider/controller/nodecontroller.go @@ -17,97 +17,125 @@ limitations under the License. package controller import ( + "errors" "fmt" "net" + "reflect" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" + "github.com/GoogleCloudPlatform/kubernetes/pkg/probe" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" ) +var ( + ErrRegistration = errors.New("unable to register all nodes.") +) + type NodeController struct { cloud cloudprovider.Interface matchRE string staticResources *api.NodeResources nodes []string kubeClient client.Interface + kubeletClient client.KubeletHealthChecker } // NewNodeController returns a new node controller to sync instances from cloudprovider. +// TODO: NodeController health checker should be a separate package other than +// kubeletclient, node health check != kubelet health check. func NewNodeController( cloud cloudprovider.Interface, matchRE string, nodes []string, staticResources *api.NodeResources, - kubeClient client.Interface) *NodeController { + kubeClient client.Interface, + kubeletClient client.KubeletHealthChecker) *NodeController { return &NodeController{ cloud: cloud, matchRE: matchRE, nodes: nodes, staticResources: staticResources, kubeClient: kubeClient, + kubeletClient: kubeletClient, } } -// Run starts syncing instances from cloudprovider periodically, or create initial node list. -func (s *NodeController) Run(period time.Duration) { - if s.cloud != nil && len(s.matchRE) > 0 { +// Run creates initial node list and start syncing instances from cloudprovider if any. +// It also starts syncing cluster node status. +func (s *NodeController) Run(period time.Duration, retryCount int) { + // Register intial set of nodes with their status set. + var nodes *api.NodeList + var err error + if s.isRunningCloudProvider() { + nodes, err = s.CloudNodes() + if err != nil { + glog.Errorf("Error loading initial node from cloudprovider: %v", err) + } + } else { + nodes, err = s.StaticNodes() + if err != nil { + glog.Errorf("Error loading initial static nodes") + } + } + nodes = s.DoChecks(nodes) + if err := s.RegisterNodes(nodes, retryCount, period); err != nil { + glog.Errorf("Error registrying node list: %+v", nodes) + } + + // Start syncing node list from cloudprovider. + if s.isRunningCloudProvider() { go util.Forever(func() { - if err := s.SyncCloud(); err != nil { + if err = s.SyncCloud(); err != nil { glog.Errorf("Error syncing cloud: %v", err) } }, period) - } else { - go s.SyncStatic(period) } + + // Start syncing node status. + go util.Forever(func() { + if err = s.SyncNodeStatus(); err != nil { + glog.Errorf("Error syncing status: %v", err) + } + }, period) } -// SyncStatic registers list of machines from command line flag. It returns after successful -// registration of all machines. -func (s *NodeController) SyncStatic(period time.Duration) error { +// RegisterNodes registers the given list of nodes, it keeps retrying for `retryCount` times. +func (s *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retryInterval time.Duration) error { registered := util.NewStringSet() - for { - for _, nodeID := range s.nodes { - if registered.Has(nodeID) { + for i := 0; i < retryCount; i++ { + for _, node := range nodes.Items { + if registered.Has(node.Name) { continue } - node := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: nodeID}, - Spec: api.NodeSpec{ - Capacity: s.staticResources.Capacity, - }, - } - addr := net.ParseIP(nodeID) - if addr != nil { - node.Status.HostIP = nodeID - } else { - addrs, err := net.LookupIP(nodeID) - if err != nil { - glog.Errorf("Can't get ip address of node %v", nodeID) - } else if len(addrs) == 0 { - glog.Errorf("No ip address for node %v", nodeID) - } else { - node.Status.HostIP = addrs[0].String() - } - } - _, err := s.kubeClient.Nodes().Create(node) + _, err := s.kubeClient.Nodes().Create(&node) if err == nil { - registered.Insert(nodeID) + registered.Insert(node.Name) + glog.Infof("Registered node in registry: %s", node.Name) + } else { + glog.Errorf("Error registrying node %s, retrying: %s", node.Name, err) + } + if registered.Len() == len(nodes.Items) { + glog.Infof("Successfully Registered all nodes") + return nil } } - if registered.Len() == len(s.nodes) { - return nil - } - time.Sleep(period) + time.Sleep(retryInterval) + } + if registered.Len() != len(nodes.Items) { + return ErrRegistration + } else { + return nil } } -// SyncCloud syncs list of instances from cloudprovider to master etcd registry. +// SyncCloud synchronizes the list of instances from cloudprovider to master server. func (s *NodeController) SyncCloud() error { - matches, err := s.cloudNodes() + matches, err := s.CloudNodes() if err != nil { return err } @@ -120,7 +148,7 @@ func (s *NodeController) SyncCloud() error { nodeMap[node.Name] = &node } - // Create or delete nodes from registry. + // Create nodes which have been created in cloud, but not in kubernetes cluster. for _, node := range matches.Items { if _, ok := nodeMap[node.Name]; !ok { glog.Infof("Create node in registry: %s", node.Name) @@ -132,6 +160,7 @@ func (s *NodeController) SyncCloud() error { delete(nodeMap, node.Name) } + // Delete nodes which have been deleted from cloud, but not from kubernetes cluster. for nodeID := range nodeMap { glog.Infof("Delete node from registry: %s", nodeID) err = s.kubeClient.Nodes().Delete(nodeID) @@ -139,29 +168,121 @@ func (s *NodeController) SyncCloud() error { glog.Errorf("Delete node error: %s", nodeID) } } + return nil } -// cloudNodes constructs and returns api.NodeList from cloudprovider. -func (s *NodeController) cloudNodes() (*api.NodeList, error) { +// SyncNodeStatus synchronizes cluster nodes status to master server. +func (s *NodeController) SyncNodeStatus() error { + nodes, err := s.kubeClient.Nodes().List() + if err != nil { + return err + } + oldNodes := make(map[string]api.Node) + for _, node := range nodes.Items { + oldNodes[node.Name] = node + } + nodes = s.DoChecks(nodes) + for _, node := range nodes.Items { + if reflect.DeepEqual(node, oldNodes[node.Name]) { + glog.V(2).Infof("skip updating node %v", node.Name) + continue + } + glog.V(2).Infof("updating node %v", node.Name) + _, err = s.kubeClient.Nodes().Update(&node) + if err != nil { + glog.Errorf("error updating node %s: %v", node.Name, err) + } + } + return nil +} + +// DoChecks performs health checking for given list of nodes. +func (s *NodeController) DoChecks(nodes *api.NodeList) *api.NodeList { + var wg sync.WaitGroup + wg.Add(len(nodes.Items)) + for i := range nodes.Items { + go func(node *api.Node) { + node.Status.Conditions = s.DoCheck(node) + wg.Done() + }(&nodes.Items[i]) + } + wg.Wait() + return nodes +} + +// DoCheck performs health checking for given node. +func (s *NodeController) DoCheck(node *api.Node) []api.NodeCondition { + var conditions []api.NodeCondition + switch status, err := s.kubeletClient.HealthCheck(node.Name); { + case err != nil: + glog.V(2).Infof("NodeController: node %s health check error: %v", node.Name, err) + conditions = append(conditions, api.NodeCondition{ + Kind: api.NodeReady, + Status: api.ConditionUnknown, + }) + case status == probe.Failure: + conditions = append(conditions, api.NodeCondition{ + Kind: api.NodeReady, + Status: api.ConditionNone, + }) + default: + conditions = append(conditions, api.NodeCondition{ + Kind: api.NodeReady, + Status: api.ConditionFull, + }) + } + glog.V(5).Infof("NodeController: node %q status was %+v", node.Name, conditions) + return conditions +} + +// StaticNodes constructs and returns api.NodeList for static nodes. If error +// occurs, an empty NodeList will be returned with a non-nil error info. +func (s *NodeController) StaticNodes() (*api.NodeList, error) { + result := &api.NodeList{} + for _, nodeID := range s.nodes { + node := api.Node{ + ObjectMeta: api.ObjectMeta{Name: nodeID}, + Spec: api.NodeSpec{Capacity: s.staticResources.Capacity}, + } + addr := net.ParseIP(nodeID) + if addr != nil { + node.Status.HostIP = nodeID + } else { + addrs, err := net.LookupIP(nodeID) + if err != nil { + glog.Errorf("Can't get ip address of node %v", nodeID) + } else if len(addrs) == 0 { + glog.Errorf("No ip address for node %v", nodeID) + } else { + node.Status.HostIP = addrs[0].String() + } + } + result.Items = append(result.Items, node) + } + return result, nil +} + +// CloudNodes constructs and returns api.NodeList from cloudprovider. If error +// occurs, an empty NodeList will be returned with a non-nil error info. +func (s *NodeController) CloudNodes() (*api.NodeList, error) { + result := &api.NodeList{} instances, ok := s.cloud.Instances() if !ok { - return nil, fmt.Errorf("cloud doesn't support instances") + return result, fmt.Errorf("cloud doesn't support instances") } matches, err := instances.List(s.matchRE) if err != nil { - return nil, err - } - result := &api.NodeList{ - Items: make([]api.Node, len(matches)), + return result, err } for i := range matches { - result.Items[i].Name = matches[i] + node := api.Node{} + node.Name = matches[i] hostIP, err := instances.IPAddress(matches[i]) if err != nil { glog.Errorf("error getting instance ip address for %s: %v", matches[i], err) } else { - result.Items[i].Status.HostIP = hostIP.String() + node.Status.HostIP = hostIP.String() } resources, err := instances.GetNodeResources(matches[i]) if err != nil { @@ -171,8 +292,14 @@ func (s *NodeController) cloudNodes() (*api.NodeList, error) { resources = s.staticResources } if resources != nil { - result.Items[i].Spec.Capacity = resources.Capacity + node.Spec.Capacity = resources.Capacity } + result.Items = append(result.Items, node) } return result, nil } + +// isRunningCloudProvider checks if cluster is running with cloud provider. +func (s *NodeController) isRunningCloudProvider() bool { + return s.cloud != nil && len(s.matchRE) > 0 +} diff --git a/pkg/cloudprovider/controller/nodecontroller_test.go b/pkg/cloudprovider/controller/nodecontroller_test.go index 5bc3b882032..9bd47b3d8b1 100644 --- a/pkg/cloudprovider/controller/nodecontroller_test.go +++ b/pkg/cloudprovider/controller/nodecontroller_test.go @@ -17,19 +17,22 @@ limitations under the License. package controller import ( + "errors" "fmt" + "net" + "reflect" + "sort" "testing" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" + "github.com/GoogleCloudPlatform/kubernetes/pkg/probe" ) -func newNode(name string) *api.Node { - return &api.Node{ObjectMeta: api.ObjectMeta{Name: name}} -} - +// FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. type FakeNodeHandler struct { client.Fake client.FakeNodes @@ -41,6 +44,7 @@ type FakeNodeHandler struct { // Output CreatedNodes []*api.Node DeletedNodes []*api.Node + UpdatedNodes []*api.Node RequestCount int } @@ -51,7 +55,8 @@ func (c *FakeNodeHandler) Nodes() client.NodeInterface { func (m *FakeNodeHandler) Create(node *api.Node) (*api.Node, error) { defer func() { m.RequestCount++ }() if m.CreateHook == nil || m.CreateHook(m, node) { - m.CreatedNodes = append(m.CreatedNodes, node) + nodeCopy := *node + m.CreatedNodes = append(m.CreatedNodes, &nodeCopy) return node, nil } else { return nil, fmt.Errorf("Create error.") @@ -60,18 +65,27 @@ func (m *FakeNodeHandler) Create(node *api.Node) (*api.Node, error) { func (m *FakeNodeHandler) List() (*api.NodeList, error) { defer func() { m.RequestCount++ }() - nodes := []api.Node{} + var nodes []*api.Node + for i := 0; i < len(m.UpdatedNodes); i++ { + if !contains(m.UpdatedNodes[i], m.DeletedNodes) { + nodes = append(nodes, m.UpdatedNodes[i]) + } + } for i := 0; i < len(m.Existing); i++ { - if !contains(m.Existing[i], m.DeletedNodes) { - nodes = append(nodes, *m.Existing[i]) + if !contains(m.Existing[i], m.DeletedNodes) && !contains(m.Existing[i], nodes) { + nodes = append(nodes, m.Existing[i]) } } for i := 0; i < len(m.CreatedNodes); i++ { - if !contains(m.Existing[i], m.DeletedNodes) { - nodes = append(nodes, *m.CreatedNodes[i]) + if !contains(m.Existing[i], m.DeletedNodes) && !contains(m.CreatedNodes[i], nodes) { + nodes = append(nodes, m.CreatedNodes[i]) } } - return &api.NodeList{Items: nodes}, nil + nodeList := &api.NodeList{} + for _, node := range nodes { + nodeList.Items = append(nodeList.Items, *node) + } + return nodeList, nil } func (m *FakeNodeHandler) Delete(id string) error { @@ -80,142 +94,377 @@ func (m *FakeNodeHandler) Delete(id string) error { return nil } -func TestSyncStaticCreateNode(t *testing.T) { - fakeNodeHandler := &FakeNodeHandler{ - CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { - return true +func (m *FakeNodeHandler) Update(node *api.Node) (*api.Node, error) { + nodeCopy := *node + m.UpdatedNodes = append(m.UpdatedNodes, &nodeCopy) + m.RequestCount++ + return node, nil +} + +// FakeKubeletClient is a fake implementation of KubeletClient. +type FakeKubeletClient struct { + Status probe.Status + Err error +} + +func (c *FakeKubeletClient) GetPodStatus(host, podNamespace, podID string) (api.PodStatusResult, error) { + return api.PodStatusResult{}, errors.New("Not Implemented") +} + +func (c *FakeKubeletClient) HealthCheck(host string) (probe.Status, error) { + return c.Status, c.Err +} + +func TestRegisterNodes(t *testing.T) { + table := []struct { + fakeNodeHandler *FakeNodeHandler + machines []string + retryCount int + expectedRequestCount int + expectedCreateCount int + expectedFail bool + }{ + { + // Register two nodes normally. + machines: []string{"node0", "node1"}, + fakeNodeHandler: &FakeNodeHandler{ + CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { return true }, + }, + retryCount: 1, + expectedRequestCount: 2, + expectedCreateCount: 2, + expectedFail: false, + }, + { + // No machine to register. + machines: []string{}, + fakeNodeHandler: &FakeNodeHandler{ + CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { return true }, + }, + retryCount: 1, + expectedRequestCount: 0, + expectedCreateCount: 0, + expectedFail: false, + }, + { + // Fail the first two requests. + machines: []string{"node0", "node1"}, + fakeNodeHandler: &FakeNodeHandler{ + CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { + if fake.RequestCount == 0 || fake.RequestCount == 1 { + return false + } + return true + }, + }, + retryCount: 10, + expectedRequestCount: 4, + expectedCreateCount: 2, + expectedFail: false, + }, + { + // The first node always fails. + machines: []string{"node0", "node1"}, + fakeNodeHandler: &FakeNodeHandler{ + CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { + if node.Name == "node0" { + return false + } + return true + }, + }, + retryCount: 2, + expectedRequestCount: 3, // 2 for node0, 1 for node1 + expectedCreateCount: 1, + expectedFail: true, }, } - nodeController := NewNodeController(nil, ".*", []string{"node0"}, &api.NodeResources{}, fakeNodeHandler) - if err := nodeController.SyncStatic(time.Millisecond); err != nil { - t.Errorf("unexpected error: %v", err) - } - if fakeNodeHandler.RequestCount != 1 { - t.Errorf("Expected 1 call, but got %v.", fakeNodeHandler.RequestCount) - } - if len(fakeNodeHandler.CreatedNodes) != 1 { - t.Errorf("expect only 1 node created, got %v", len(fakeNodeHandler.CreatedNodes)) - } - if fakeNodeHandler.CreatedNodes[0].Name != "node0" { - t.Errorf("unexpect node %v created", fakeNodeHandler.CreatedNodes[0].Name) + for _, item := range table { + nodes := api.NodeList{} + for _, machine := range item.machines { + nodes.Items = append(nodes.Items, *newNode(machine)) + } + nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil) + err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond) + if !item.expectedFail && err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.expectedFail && err == nil { + t.Errorf("unexpected non-error") + } + if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { + t.Errorf("expected %v calls, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + } + if len(item.fakeNodeHandler.CreatedNodes) != item.expectedCreateCount { + t.Errorf("expected %v nodes, but got %v.", item.expectedCreateCount, item.fakeNodeHandler.CreatedNodes) + } } } -func TestSyncStaticCreateNodeWithHostIP(t *testing.T) { - fakeNodeHandler := &FakeNodeHandler{ - CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { - return true +func TestCreateStaticNodes(t *testing.T) { + table := []struct { + machines []string + expectedNodes *api.NodeList + }{ + { + machines: []string{}, + expectedNodes: &api.NodeList{}, + }, + { + machines: []string{"node0"}, + expectedNodes: &api.NodeList{ + Items: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{}, + }, + }, + }, }, } - nodeController := NewNodeController(nil, ".*", []string{"10.0.0.1"}, &api.NodeResources{}, fakeNodeHandler) - if err := nodeController.SyncStatic(time.Millisecond); err != nil { - t.Errorf("unexpected error: %v", err) - } - if fakeNodeHandler.CreatedNodes[0].Name != "10.0.0.1" { - t.Errorf("unexpect node %v created", fakeNodeHandler.CreatedNodes[0].Name) - } - if fakeNodeHandler.CreatedNodes[0].Status.HostIP != "10.0.0.1" { - t.Errorf("unexpect nil node HostIP for node %v", fakeNodeHandler.CreatedNodes[0].Name) + for _, item := range table { + nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, nil, nil) + nodes, err := nodeController.StaticNodes() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if !reflect.DeepEqual(item.expectedNodes, nodes) { + t.Errorf("expected node list %+v, got %+v", item.expectedNodes, nodes) + } } } -func TestSyncStaticCreateNodeWithError(t *testing.T) { - fakeNodeHandler := &FakeNodeHandler{ - CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { - if fake.RequestCount == 0 { - return false - } - return true +func TestCreateCloudNodes(t *testing.T) { + resourceList := api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(1000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(3000, resource.DecimalSI), + } + + table := []struct { + fakeCloud *fake_cloud.FakeCloud + machines []string + expectedNodes *api.NodeList + }{ + { + fakeCloud: &fake_cloud.FakeCloud{}, + expectedNodes: &api.NodeList{}, + }, + { + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0"}, + IP: net.ParseIP("1.2.3.4"), + NodeResources: &api.NodeResources{Capacity: resourceList}, + }, + expectedNodes: &api.NodeList{ + Items: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Spec: api.NodeSpec{Capacity: resourceList}, + Status: api.NodeStatus{HostIP: "1.2.3.4"}, + }, + }, + }, }, } - nodeController := NewNodeController(nil, ".*", []string{"node0"}, &api.NodeResources{}, fakeNodeHandler) - if err := nodeController.SyncStatic(time.Millisecond); err != nil { - t.Errorf("unexpected error: %v", err) - } - if fakeNodeHandler.RequestCount != 2 { - t.Errorf("Expected 2 call, but got %v.", fakeNodeHandler.RequestCount) - } - if len(fakeNodeHandler.CreatedNodes) != 1 { - t.Errorf("expect only 1 node created, got %v", len(fakeNodeHandler.CreatedNodes)) - } - if fakeNodeHandler.CreatedNodes[0].Name != "node0" { - t.Errorf("unexpect node %v created", fakeNodeHandler.CreatedNodes[0].Name) + for _, item := range table { + nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil) + nodes, err := nodeController.CloudNodes() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if !reflect.DeepEqual(item.expectedNodes, nodes) { + t.Errorf("expected node list %+v, got %+v", item.expectedNodes, nodes) + } } } -func TestSyncCloudCreateNode(t *testing.T) { - fakeNodeHandler := &FakeNodeHandler{ - Existing: []*api.Node{newNode("node0")}, - } - instances := []string{"node0", "node1"} - fakeCloud := fake_cloud.FakeCloud{ - Machines: instances, - } - nodeController := NewNodeController(&fakeCloud, ".*", nil, nil, fakeNodeHandler) - if err := nodeController.SyncCloud(); err != nil { - t.Errorf("unexpected error: %v", err) +func TestSyncCloud(t *testing.T) { + table := []struct { + fakeNodeHandler *FakeNodeHandler + fakeCloud *fake_cloud.FakeCloud + matchRE string + expectedRequestCount int + expectedCreated []string + expectedDeleted []string + }{ + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0")}, + }, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0", "node1"}, + }, + matchRE: ".*", + expectedRequestCount: 2, // List + Create + expectedCreated: []string{"node1"}, + expectedDeleted: []string{}, + }, + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0"), newNode("node1")}, + }, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0"}, + }, + matchRE: ".*", + expectedRequestCount: 2, // List + Delete + expectedCreated: []string{}, + expectedDeleted: []string{"node1"}, + }, + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0")}, + }, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0", "node1", "fake"}, + }, + matchRE: "node[0-9]+", + expectedRequestCount: 2, // List + Create + expectedCreated: []string{"node1"}, + expectedDeleted: []string{}, + }, } - if fakeNodeHandler.RequestCount != 2 { - t.Errorf("Expected 2 call, but got %v.", fakeNodeHandler.RequestCount) - } - if len(fakeNodeHandler.CreatedNodes) != 1 { - t.Errorf("expect only 1 node created, got %v", len(fakeNodeHandler.CreatedNodes)) - } - if fakeNodeHandler.CreatedNodes[0].Name != "node1" { - t.Errorf("unexpect node %v created", fakeNodeHandler.CreatedNodes[0].Name) + for _, item := range table { + nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil) + if err := nodeController.SyncCloud(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { + t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + } + nodes := sortedNodeNames(item.fakeNodeHandler.CreatedNodes) + if !reflect.DeepEqual(item.expectedCreated, nodes) { + t.Errorf("expected node list %+v, got %+v", item.expectedCreated, nodes) + } + nodes = sortedNodeNames(item.fakeNodeHandler.DeletedNodes) + if !reflect.DeepEqual(item.expectedDeleted, nodes) { + t.Errorf("expected node list %+v, got %+v", item.expectedDeleted, nodes) + } } } -func TestSyncCloudDeleteNode(t *testing.T) { - fakeNodeHandler := &FakeNodeHandler{ - Existing: []*api.Node{newNode("node0"), newNode("node1")}, - } - instances := []string{"node0"} - fakeCloud := fake_cloud.FakeCloud{ - Machines: instances, - } - nodeController := NewNodeController(&fakeCloud, ".*", nil, nil, fakeNodeHandler) - if err := nodeController.SyncCloud(); err != nil { - t.Errorf("unexpected error: %v", err) +func TestHealthCheckNode(t *testing.T) { + table := []struct { + node *api.Node + fakeKubeletClient *FakeKubeletClient + expectedConditions []api.NodeCondition + }{ + { + node: newNode("node0"), + fakeKubeletClient: &FakeKubeletClient{ + Status: probe.Success, + Err: nil, + }, + expectedConditions: []api.NodeCondition{ + { + Kind: api.NodeReady, + Status: api.ConditionFull, + }, + }, + }, + { + node: newNode("node0"), + fakeKubeletClient: &FakeKubeletClient{ + Status: probe.Failure, + Err: nil, + }, + expectedConditions: []api.NodeCondition{ + { + Kind: api.NodeReady, + Status: api.ConditionNone, + }, + }, + }, + { + node: newNode("node1"), + fakeKubeletClient: &FakeKubeletClient{ + Status: probe.Failure, + Err: errors.New("Error"), + }, + expectedConditions: []api.NodeCondition{ + { + Kind: api.NodeReady, + Status: api.ConditionUnknown, + }, + }, + }, } - if fakeNodeHandler.RequestCount != 2 { - t.Errorf("Expected 2 call, but got %v.", fakeNodeHandler.RequestCount) - } - if len(fakeNodeHandler.DeletedNodes) != 1 { - t.Errorf("expect only 1 node deleted, got %v", len(fakeNodeHandler.DeletedNodes)) - } - if fakeNodeHandler.DeletedNodes[0].Name != "node1" { - t.Errorf("unexpect node %v created", fakeNodeHandler.DeletedNodes[0].Name) + for _, item := range table { + nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient) + conditions := nodeController.DoCheck(item.node) + if !reflect.DeepEqual(item.expectedConditions, conditions) { + t.Errorf("expected conditions %+v, got %+v", item.expectedConditions, conditions) + } } } -func TestSyncCloudRegexp(t *testing.T) { - fakeNodeHandler := &FakeNodeHandler{ - Existing: []*api.Node{newNode("node0")}, - } - instances := []string{"node0", "node1", "fake"} - fakeCloud := fake_cloud.FakeCloud{ - Machines: instances, - } - nodeController := NewNodeController(&fakeCloud, "node[0-9]+", nil, nil, fakeNodeHandler) - if err := nodeController.SyncCloud(); err != nil { - t.Errorf("unexpected error: %v", err) +func TestSyncNodeStatus(t *testing.T) { + table := []struct { + fakeNodeHandler *FakeNodeHandler + fakeKubeletClient *FakeKubeletClient + expectedNodes []*api.Node + expectedRequestCount int + }{ + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0"), newNode("node1")}, + }, + fakeKubeletClient: &FakeKubeletClient{ + Status: probe.Success, + Err: nil, + }, + expectedNodes: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Status: api.NodeStatus{Conditions: []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionFull}}}, + }, + { + ObjectMeta: api.ObjectMeta{Name: "node1"}, + Status: api.NodeStatus{Conditions: []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionFull}}}, + }, + }, + expectedRequestCount: 3, // List + 2xUpdate + }, } - if fakeNodeHandler.RequestCount != 2 { - t.Errorf("Expected 2 call, but got %v.", fakeNodeHandler.RequestCount) + for _, item := range table { + nodeController := NewNodeController(nil, "", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient) + if err := nodeController.SyncNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { + t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + } + if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) { + t.Errorf("expected nodes %+v, got %+v", item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) + } + item.fakeNodeHandler.RequestCount = 0 + if err := nodeController.SyncNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.fakeNodeHandler.RequestCount != 1 { + t.Errorf("expected one list for updating same status, but got %v.", item.fakeNodeHandler.RequestCount) + } } - if len(fakeNodeHandler.CreatedNodes) != 1 { - t.Errorf("expect only 1 node created, got %v", len(fakeNodeHandler.CreatedNodes)) - } - if fakeNodeHandler.CreatedNodes[0].Name != "node1" { - t.Errorf("unexpect node %v created", fakeNodeHandler.CreatedNodes[0].Name) +} + +func newNode(name string) *api.Node { + return &api.Node{ObjectMeta: api.ObjectMeta{Name: name}} +} + +func sortedNodeNames(nodes []*api.Node) []string { + nodeNames := []string{} + for _, node := range nodes { + nodeNames = append(nodeNames, node.Name) } + sort.Strings(nodeNames) + return nodeNames } func contains(node *api.Node, nodes []*api.Node) bool { diff --git a/pkg/master/master.go b/pkg/master/master.go index c086a118315..20bf50bbb2f 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -68,7 +68,6 @@ type Config struct { Client *client.Client Cloud cloudprovider.Interface EtcdHelper tools.EtcdHelper - HealthCheckMinions bool EventTTL time.Duration MinionRegexp string KubeletClient client.KubeletClient @@ -235,7 +234,7 @@ func setDefaults(c *Config) { // any unhandled paths to "Handler". func New(c *Config) *Master { setDefaults(c) - minionRegistry := makeMinionRegistry(c) + minionRegistry := etcd.NewRegistry(c.EtcdHelper, nil) serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil) boundPodFactory := &pod.BasicBoundPodFactory{ ServiceRegistry: serviceRegistry, @@ -330,15 +329,6 @@ func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter) glog.Errorln(buffer.String()) } -func makeMinionRegistry(c *Config) minion.Registry { - var minionRegistry minion.Registry = etcd.NewRegistry(c.EtcdHelper, nil) - // TODO: plumb in nodeIPCache here - if c.HealthCheckMinions { - minionRegistry = minion.NewHealthyRegistry(minionRegistry, c.KubeletClient, util.RealClock{}, 20*time.Second) - } - return minionRegistry -} - // init initializes master. func (m *Master) init(c *Config) { var userContexts = handlers.NewUserRequestContext() diff --git a/pkg/probe/http/http.go b/pkg/probe/http/http.go index fee63737cfc..e3e7898dbc2 100644 --- a/pkg/probe/http/http.go +++ b/pkg/probe/http/http.go @@ -45,8 +45,8 @@ type HTTPGetInterface interface { } // DoHTTPProbe checks if a GET request to the url succeeds. -// If the HTTP response code is successful (i.e. 400 > code >= 200), it returns Healthy. -// If the HTTP response code is unsuccessful or HTTP communication fails, it returns Unhealthy. +// If the HTTP response code is successful (i.e. 400 > code >= 200), it returns Success. +// If the HTTP response code is unsuccessful or HTTP communication fails, it returns Failure. // This is exported because some other packages may want to do direct HTTP probes. func DoHTTPProbe(url string, client HTTPGetInterface) (probe.Status, error) { res, err := client.Get(url) diff --git a/pkg/probe/tcp/tcp.go b/pkg/probe/tcp/tcp.go index 206aa74a13e..a452a802c74 100644 --- a/pkg/probe/tcp/tcp.go +++ b/pkg/probe/tcp/tcp.go @@ -36,8 +36,8 @@ func (pr TCPProber) Probe(host string, port int) (probe.Status, error) { } // DoTCPProbe checks that a TCP socket to the address can be opened. -// If the socket can be opened, it returns Healthy. -// If the socket fails to open, it returns Unhealthy. +// If the socket can be opened, it returns Success +// If the socket fails to open, it returns Failure. // This is exported because some other packages may want to do direct TCP probes. func DoTCPProbe(addr string) (probe.Status, error) { conn, err := net.Dial("tcp", addr) diff --git a/pkg/registry/minion/healthy_registry.go b/pkg/registry/minion/healthy_registry.go deleted file mode 100644 index 2ce68129e11..00000000000 --- a/pkg/registry/minion/healthy_registry.go +++ /dev/null @@ -1,125 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package minion - -import ( - "sync" - "time" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/probe" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" - - "github.com/golang/glog" -) - -type HealthyRegistry struct { - delegate Registry - client client.KubeletHealthChecker - cache util.TimeCache -} - -func NewHealthyRegistry(delegate Registry, client client.KubeletHealthChecker, clock util.Clock, ttl time.Duration) Registry { - h := &HealthyRegistry{ - delegate: delegate, - client: client, - } - h.cache = util.NewTimeCache(clock, ttl, h.doCheck) - return h -} - -func (r *HealthyRegistry) GetMinion(ctx api.Context, minionID string) (*api.Node, error) { - minion, err := r.delegate.GetMinion(ctx, minionID) - if err != nil { - return nil, err - } - return r.checkMinion(minion), nil -} - -func (r *HealthyRegistry) DeleteMinion(ctx api.Context, minionID string) error { - return r.delegate.DeleteMinion(ctx, minionID) -} - -func (r *HealthyRegistry) CreateMinion(ctx api.Context, minion *api.Node) error { - return r.delegate.CreateMinion(ctx, minion) -} - -func (r *HealthyRegistry) UpdateMinion(ctx api.Context, minion *api.Node) error { - return r.delegate.UpdateMinion(ctx, minion) -} - -func (r *HealthyRegistry) ListMinions(ctx api.Context) (currentMinions *api.NodeList, err error) { - list, err := r.delegate.ListMinions(ctx) - if err != nil { - return nil, err - } - - // In case the cache is empty, health check in parallel instead of serially. - var wg sync.WaitGroup - wg.Add(len(list.Items)) - for i := range list.Items { - go func(i int) { - list.Items[i] = *r.checkMinion(&list.Items[i]) - wg.Done() - }(i) - } - wg.Wait() - return list, nil -} - -func (r *HealthyRegistry) WatchMinions(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { - w, err := r.delegate.WatchMinions(ctx, label, field, resourceVersion) - if err != nil { - return nil, err - } - return watch.Filter(w, watch.FilterFunc(func(in watch.Event) (watch.Event, bool) { - if node, ok := in.Object.(*api.Node); ok && node != nil { - in.Object = r.checkMinion(node) - } - return in, true - })), nil -} - -func (r *HealthyRegistry) checkMinion(node *api.Node) *api.Node { - condition := r.cache.Get(node.Name).(api.NodeConditionStatus) - // TODO: distinguish other conditions like Reachable/Live, and begin storing this - // data on nodes directly via sync loops. - node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{ - Kind: api.NodeReady, - Status: condition, - }) - return node -} - -// This is called to fill the cache. -func (r *HealthyRegistry) doCheck(key string) util.T { - var nodeStatus api.NodeConditionStatus - switch status, err := r.client.HealthCheck(key); { - case err != nil: - glog.V(2).Infof("HealthyRegistry: node %q health check error: %v", key, err) - nodeStatus = api.ConditionUnknown - case status == probe.Failure: - nodeStatus = api.ConditionNone - default: - nodeStatus = api.ConditionFull - } - glog.V(3).Infof("HealthyRegistry: node %q status was %q", key, nodeStatus) - return nodeStatus -} diff --git a/pkg/registry/minion/healthy_registry_test.go b/pkg/registry/minion/healthy_registry_test.go deleted file mode 100644 index 4bbfa728334..00000000000 --- a/pkg/registry/minion/healthy_registry_test.go +++ /dev/null @@ -1,114 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package minion - -import ( - "reflect" - "testing" - "time" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/probe" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" -) - -type alwaysYes struct{} - -func (alwaysYes) HealthCheck(host string) (probe.Status, error) { - return probe.Success, nil -} - -func TestBasicDelegation(t *testing.T) { - ctx := api.NewContext() - mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}, api.NodeResources{}) - healthy := NewHealthyRegistry( - mockMinionRegistry, - alwaysYes{}, - &util.FakeClock{}, - 60*time.Second, - ) - list, err := healthy.ListMinions(ctx) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if !reflect.DeepEqual(list, &mockMinionRegistry.Minions) { - t.Errorf("Expected %v, Got %v", mockMinionRegistry.Minions, list) - } - err = healthy.CreateMinion(ctx, &api.Node{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - }) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - minion, err := healthy.GetMinion(ctx, "m1") - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if minion == nil { - t.Errorf("Unexpected absence of 'm1'") - } - minion, err = healthy.GetMinion(ctx, "m5") - if err == nil { - t.Errorf("unexpected non-error") - } - if minion != nil { - t.Errorf("Unexpected presence of 'm5'") - } -} - -type notMinion struct { - minion string -} - -func (n *notMinion) HealthCheck(host string) (probe.Status, error) { - if host != n.minion { - return probe.Success, nil - } else { - return probe.Failure, nil - } -} - -func TestFiltering(t *testing.T) { - ctx := api.NewContext() - mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}, api.NodeResources{}) - healthy := NewHealthyRegistry( - mockMinionRegistry, - ¬Minion{minion: "m1"}, - &util.FakeClock{}, - 60*time.Second, - ) - expected := []string{"m1", "m2", "m3"} - list, err := healthy.ListMinions(ctx) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - expectedMinions := registrytest.MakeMinionList(expected, api.NodeResources{}) - expectedMinions.Items[0].Status.Conditions = []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionNone}} - expectedMinions.Items[1].Status.Conditions = []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionFull}} - expectedMinions.Items[2].Status.Conditions = []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionFull}} - if !reflect.DeepEqual(list, expectedMinions) { - t.Errorf("Expected %v, Got %v", expected, list) - } - minion, err := healthy.GetMinion(ctx, "m1") - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if minion == nil { - t.Errorf("Unexpected empty 'm1'") - } -} diff --git a/pkg/registry/minion/rest_test.go b/pkg/registry/minion/rest_test.go index 195fa632598..a18eade0518 100644 --- a/pkg/registry/minion/rest_test.go +++ b/pkg/registry/minion/rest_test.go @@ -18,13 +18,11 @@ package minion import ( "testing" - "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) func TestMinionRegistryREST(t *testing.T) { @@ -89,57 +87,6 @@ func TestMinionRegistryREST(t *testing.T) { } } -func TestMinionRegistryHealthCheck(t *testing.T) { - minionRegistry := registrytest.NewMinionRegistry([]string{}, api.NodeResources{}) - minionHealthRegistry := NewHealthyRegistry( - minionRegistry, - ¬Minion{minion: "m1"}, - &util.FakeClock{}, - 60*time.Second, - ) - - ms := NewREST(minionHealthRegistry) - ctx := api.NewContext() - - c, err := ms.Create(ctx, &api.Node{ObjectMeta: api.ObjectMeta{Name: "m1"}}) - if err != nil { - t.Fatalf("insert failed: %v", err) - } - result := <-c - if m, ok := result.Object.(*api.Node); !ok || m.Name != "m1" { - t.Errorf("insert return value was weird: %#v", result) - } - if _, err := ms.Get(ctx, "m1"); err != nil { - t.Errorf("node is unhealthy, expect no error: %v", err) - } -} - -func contains(nodes *api.NodeList, nodeID string) bool { - for _, node := range nodes.Items { - if node.Name == nodeID { - return true - } - } - return false -} - -func TestMinionRegistryInvalidUpdate(t *testing.T) { - storage := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{})) - ctx := api.NewContext() - obj, err := storage.Get(ctx, "foo") - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - minion, ok := obj.(*api.Node) - if !ok { - t.Fatalf("Object is not a minion: %#v", obj) - } - minion.Status.HostIP = "1.2.3.4" - if _, err = storage.Update(ctx, minion); err == nil { - t.Error("Unexpected non-error.") - } -} - func TestMinionRegistryValidUpdate(t *testing.T) { storage := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{})) ctx := api.NewContext() @@ -192,3 +139,12 @@ func TestMinionRegistryValidatesCreate(t *testing.T) { } } } + +func contains(nodes *api.NodeList, nodeID string) bool { + for _, node := range nodes.Items { + if node.Name == nodeID { + return true + } + } + return false +} diff --git a/pkg/standalone/standalone.go b/pkg/standalone/standalone.go index 4aa110090e5..2696f86ea95 100644 --- a/pkg/standalone/standalone.go +++ b/pkg/standalone/standalone.go @@ -36,6 +36,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" + "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -134,8 +135,9 @@ func RunControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, api.ResourceMemory: *resource.NewQuantity(nodeMemory, resource.BinarySI), }, } - nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl) - nodeController.Run(10 * time.Second) + kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort} + nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient) + nodeController.Run(10*time.Second, 10) endpoints := service.NewEndpointController(cl) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index d8410c4a2f4..a6f07d4d5c2 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -205,9 +205,8 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) { nodes.Items = append(nodes.Items, node) } } else { - // If no condition is set, either node health check is disabled (master - // flag "healthCheckMinions" is set to false), or we get unknown condition. - // In such cases, we add nodes unconditionally. + // If no condition is set, we get unknown node condition. In such cases, + // we add nodes unconditionally. nodes.Items = append(nodes.Items, node) } } diff --git a/test/integration/auth_test.go b/test/integration/auth_test.go index 6cf9969f82c..a2ca1471291 100644 --- a/test/integration/auth_test.go +++ b/test/integration/auth_test.go @@ -203,7 +203,7 @@ func getTestRequests() []struct { // Normal methods on services {"GET", "/api/v1beta1/services", "", code200}, {"POST", "/api/v1beta1/services" + timeoutFlag, aService, code200}, - {"PUT", "/api/v1beta1/services/a" + timeoutFlag, aService, code409}, // TODO: GET and put back server-provided fields to avoid a 422 + {"PUT", "/api/v1beta1/services/a" + timeoutFlag, aService, code409}, // See #2115 about why 409 {"GET", "/api/v1beta1/services", "", code200}, {"GET", "/api/v1beta1/services/a", "", code200}, {"DELETE", "/api/v1beta1/services/a" + timeoutFlag, "", code200}, @@ -227,7 +227,7 @@ func getTestRequests() []struct { // Normal methods on minions {"GET", "/api/v1beta1/minions", "", code200}, {"POST", "/api/v1beta1/minions" + timeoutFlag, aMinion, code200}, - {"PUT", "/api/v1beta1/minions/a" + timeoutFlag, aMinion, code422}, // TODO: GET and put back server-provided fields to avoid a 422 + {"PUT", "/api/v1beta1/minions/a" + timeoutFlag, aMinion, code409}, // See #2115 about why 409 {"GET", "/api/v1beta1/minions", "", code200}, {"GET", "/api/v1beta1/minions/a", "", code200}, {"DELETE", "/api/v1beta1/minions/a" + timeoutFlag, "", code200},