mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Sync node status from node controller to master.
This commit is contained in:
parent
e619f303d7
commit
c793c4f0ab
@ -184,8 +184,8 @@ func startComponents(manifestURL string) (apiServerURL string) {
|
|||||||
controllerManager.Run(10 * time.Minute)
|
controllerManager.Run(10 * time.Minute)
|
||||||
|
|
||||||
nodeResources := &api.NodeResources{}
|
nodeResources := &api.NodeResources{}
|
||||||
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl)
|
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{})
|
||||||
nodeController.Run(10 * time.Second)
|
nodeController.Run(10*time.Second, 10)
|
||||||
|
|
||||||
// Kubelet (localhost)
|
// Kubelet (localhost)
|
||||||
testRootDir := makeTempDirOrDie("kubelet_integ_1.")
|
testRootDir := makeTempDirOrDie("kubelet_integ_1.")
|
||||||
|
@ -184,7 +184,6 @@ func main() {
|
|||||||
Client: client,
|
Client: client,
|
||||||
Cloud: cloud,
|
Cloud: cloud,
|
||||||
EtcdHelper: helper,
|
EtcdHelper: helper,
|
||||||
HealthCheckMinions: *healthCheckMinions,
|
|
||||||
EventTTL: *eventTTL,
|
EventTTL: *eventTTL,
|
||||||
KubeletClient: kubeletClient,
|
KubeletClient: kubeletClient,
|
||||||
PortalNet: &n,
|
PortalNet: &n,
|
||||||
|
@ -53,18 +53,22 @@ var (
|
|||||||
nodeSyncPeriod = flag.Duration("node_sync_period", 10*time.Second, ""+
|
nodeSyncPeriod = flag.Duration("node_sync_period", 10*time.Second, ""+
|
||||||
"The period for syncing nodes from cloudprovider. Longer periods will result in "+
|
"The period for syncing nodes from cloudprovider. Longer periods will result in "+
|
||||||
"fewer calls to cloud provider, but may delay addition of new nodes to cluster.")
|
"fewer calls to cloud provider, but may delay addition of new nodes to cluster.")
|
||||||
|
resourceQuotaSyncPeriod = flag.Duration("resource_quota_sync_period", 10*time.Second, "The period for syncing quota usage status in the system")
|
||||||
|
registerRetryCount = flag.Int("register_retry_count", 10, ""+
|
||||||
|
"The number of retries for initial node registration. Retry interval equals node_sync_period.")
|
||||||
machineList util.StringList
|
machineList util.StringList
|
||||||
// TODO: Discover these by pinging the host machines, and rip out these flags.
|
// TODO: Discover these by pinging the host machines, and rip out these flags.
|
||||||
// TODO: in the meantime, use resource.QuantityFlag() instead of these
|
// TODO: in the meantime, use resource.QuantityFlag() instead of these
|
||||||
nodeMilliCPU = flag.Int64("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node")
|
nodeMilliCPU = flag.Int64("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node")
|
||||||
nodeMemory = resource.QuantityFlag("node_memory", "3Gi", "The amount of memory (in bytes) provisioned on each node")
|
nodeMemory = resource.QuantityFlag("node_memory", "3Gi", "The amount of memory (in bytes) provisioned on each node")
|
||||||
resourceQuotaSyncPeriod = flag.Duration("resource_quota_sync_period", 10*time.Second, "The period for syncing quota usage status in the system")
|
kubeletConfig = client.KubeletConfig{Port: ports.KubeletPort, EnableHttps: false}
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
flag.Var(&address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
|
flag.Var(&address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
|
||||||
flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.")
|
flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.")
|
||||||
client.BindClientConfigFlags(flag.CommandLine, clientConfig)
|
client.BindClientConfigFlags(flag.CommandLine, clientConfig)
|
||||||
|
client.BindKubeletClientConfigFlags(flag.CommandLine, &kubeletConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
func verifyMinionFlags() {
|
func verifyMinionFlags() {
|
||||||
@ -104,6 +108,10 @@ func main() {
|
|||||||
controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient)
|
controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient)
|
||||||
controllerManager.Run(10 * time.Second)
|
controllerManager.Run(10 * time.Second)
|
||||||
|
|
||||||
|
kubeletClient, err := client.NewKubeletClient(&kubeletConfig)
|
||||||
|
if err != nil {
|
||||||
|
glog.Fatalf("Failure to start kubelet client: %v", err)
|
||||||
|
}
|
||||||
cloud := cloudprovider.InitCloudProvider(*cloudProvider, *cloudConfigFile)
|
cloud := cloudprovider.InitCloudProvider(*cloudProvider, *cloudConfigFile)
|
||||||
nodeResources := &api.NodeResources{
|
nodeResources := &api.NodeResources{
|
||||||
Capacity: api.ResourceList{
|
Capacity: api.ResourceList{
|
||||||
@ -111,8 +119,8 @@ func main() {
|
|||||||
api.ResourceMemory: *nodeMemory,
|
api.ResourceMemory: *nodeMemory,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
nodeController := nodeControllerPkg.NewNodeController(cloud, *minionRegexp, machineList, nodeResources, kubeClient)
|
nodeController := nodeControllerPkg.NewNodeController(cloud, *minionRegexp, machineList, nodeResources, kubeClient, kubeletClient)
|
||||||
nodeController.Run(*nodeSyncPeriod)
|
nodeController.Run(*nodeSyncPeriod, *registerRetryCount)
|
||||||
|
|
||||||
resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
|
resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
|
||||||
resourceQuotaManager.Run(*resourceQuotaSyncPeriod)
|
resourceQuotaManager.Run(*resourceQuotaSyncPeriod)
|
||||||
|
@ -673,9 +673,11 @@ func ValidateMinionUpdate(oldMinion *api.Node, minion *api.Node) errs.Validation
|
|||||||
allErrs := errs.ValidationErrorList{}
|
allErrs := errs.ValidationErrorList{}
|
||||||
allErrs = append(allErrs, ValidateObjectMetaUpdate(&oldMinion.ObjectMeta, &minion.ObjectMeta).Prefix("metadata")...)
|
allErrs = append(allErrs, ValidateObjectMetaUpdate(&oldMinion.ObjectMeta, &minion.ObjectMeta).Prefix("metadata")...)
|
||||||
|
|
||||||
if !api.Semantic.DeepEqual(minion.Status, api.NodeStatus{}) {
|
// TODO: Enable the code once we have better api object.status update model. Currently,
|
||||||
allErrs = append(allErrs, errs.NewFieldInvalid("status", minion.Status, "status must be empty"))
|
// anyone can update node status.
|
||||||
}
|
// if !api.Semantic.DeepEqual(minion.Status, api.NodeStatus{}) {
|
||||||
|
// allErrs = append(allErrs, errs.NewFieldInvalid("status", minion.Status, "status must be empty"))
|
||||||
|
// }
|
||||||
|
|
||||||
// TODO: move reset function to its own location
|
// TODO: move reset function to its own location
|
||||||
// Ignore metadata changes now that they have been tested
|
// Ignore metadata changes now that they have been tested
|
||||||
|
@ -1476,20 +1476,6 @@ func TestValidateMinionUpdate(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}, true},
|
}, true},
|
||||||
{api.Node{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "foo",
|
|
||||||
Labels: map[string]string{"bar": "foo"},
|
|
||||||
},
|
|
||||||
}, api.Node{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "foo",
|
|
||||||
Labels: map[string]string{"bar": "fooobaz"},
|
|
||||||
},
|
|
||||||
Status: api.NodeStatus{
|
|
||||||
HostIP: "1.2.3.4",
|
|
||||||
},
|
|
||||||
}, false},
|
|
||||||
{api.Node{
|
{api.Node{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
|
@ -17,68 +17,233 @@ limitations under the License.
|
|||||||
package controller
|
package controller
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"reflect"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrRegistration = errors.New("unable to register all nodes.")
|
||||||
|
)
|
||||||
|
|
||||||
type NodeController struct {
|
type NodeController struct {
|
||||||
cloud cloudprovider.Interface
|
cloud cloudprovider.Interface
|
||||||
matchRE string
|
matchRE string
|
||||||
staticResources *api.NodeResources
|
staticResources *api.NodeResources
|
||||||
nodes []string
|
nodes []string
|
||||||
kubeClient client.Interface
|
kubeClient client.Interface
|
||||||
|
kubeletClient client.KubeletHealthChecker
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNodeController returns a new node controller to sync instances from cloudprovider.
|
// NewNodeController returns a new node controller to sync instances from cloudprovider.
|
||||||
|
// TODO: NodeController health checker should be a separate package other than
|
||||||
|
// kubeletclient, node health check != kubelet health check.
|
||||||
func NewNodeController(
|
func NewNodeController(
|
||||||
cloud cloudprovider.Interface,
|
cloud cloudprovider.Interface,
|
||||||
matchRE string,
|
matchRE string,
|
||||||
nodes []string,
|
nodes []string,
|
||||||
staticResources *api.NodeResources,
|
staticResources *api.NodeResources,
|
||||||
kubeClient client.Interface) *NodeController {
|
kubeClient client.Interface,
|
||||||
|
kubeletClient client.KubeletHealthChecker) *NodeController {
|
||||||
return &NodeController{
|
return &NodeController{
|
||||||
cloud: cloud,
|
cloud: cloud,
|
||||||
matchRE: matchRE,
|
matchRE: matchRE,
|
||||||
nodes: nodes,
|
nodes: nodes,
|
||||||
staticResources: staticResources,
|
staticResources: staticResources,
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
|
kubeletClient: kubeletClient,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts syncing instances from cloudprovider periodically, or create initial node list.
|
// Run creates initial node list and start syncing instances from cloudprovider if any.
|
||||||
func (s *NodeController) Run(period time.Duration) {
|
// It also starts syncing cluster node status.
|
||||||
if s.cloud != nil && len(s.matchRE) > 0 {
|
func (s *NodeController) Run(period time.Duration, retryCount int) {
|
||||||
|
// Register intial set of nodes with their status set.
|
||||||
|
var nodes *api.NodeList
|
||||||
|
var err error
|
||||||
|
if s.isRunningCloudProvider() {
|
||||||
|
nodes, err = s.CloudNodes()
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error loading initial node from cloudprovider: %v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
nodes, err = s.StaticNodes()
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error loading initial static nodes")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nodes = s.DoChecks(nodes)
|
||||||
|
if err := s.RegisterNodes(nodes, retryCount, period); err != nil {
|
||||||
|
glog.Errorf("Error registrying node list: %+v", nodes)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start syncing node list from cloudprovider.
|
||||||
|
if s.isRunningCloudProvider() {
|
||||||
go util.Forever(func() {
|
go util.Forever(func() {
|
||||||
if err := s.SyncCloud(); err != nil {
|
if err = s.SyncCloud(); err != nil {
|
||||||
glog.Errorf("Error syncing cloud: %v", err)
|
glog.Errorf("Error syncing cloud: %v", err)
|
||||||
}
|
}
|
||||||
}, period)
|
}, period)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start syncing node status.
|
||||||
|
go util.Forever(func() {
|
||||||
|
if err = s.SyncNodeStatus(); err != nil {
|
||||||
|
glog.Errorf("Error syncing status: %v", err)
|
||||||
|
}
|
||||||
|
}, period)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterNodes registers the given list of nodes, it keeps retrying for `retryCount` times.
|
||||||
|
func (s *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retryInterval time.Duration) error {
|
||||||
|
registered := util.NewStringSet()
|
||||||
|
for i := 0; i < retryCount; i++ {
|
||||||
|
for _, node := range nodes.Items {
|
||||||
|
if registered.Has(node.Name) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_, err := s.kubeClient.Nodes().Create(&node)
|
||||||
|
if err == nil {
|
||||||
|
registered.Insert(node.Name)
|
||||||
|
glog.Infof("Registered node in registry: %s", node.Name)
|
||||||
} else {
|
} else {
|
||||||
go s.SyncStatic(period)
|
glog.Errorf("Error registrying node %s, retrying: %s", node.Name, err)
|
||||||
|
}
|
||||||
|
if registered.Len() == len(nodes.Items) {
|
||||||
|
glog.Infof("Successfully Registered all nodes")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
time.Sleep(retryInterval)
|
||||||
|
}
|
||||||
|
if registered.Len() != len(nodes.Items) {
|
||||||
|
return ErrRegistration
|
||||||
|
} else {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SyncStatic registers list of machines from command line flag. It returns after successful
|
// SyncCloud synchronizes the list of instances from cloudprovider to master server.
|
||||||
// registration of all machines.
|
func (s *NodeController) SyncCloud() error {
|
||||||
func (s *NodeController) SyncStatic(period time.Duration) error {
|
matches, err := s.CloudNodes()
|
||||||
registered := util.NewStringSet()
|
if err != nil {
|
||||||
for {
|
return err
|
||||||
for _, nodeID := range s.nodes {
|
}
|
||||||
if registered.Has(nodeID) {
|
nodes, err := s.kubeClient.Nodes().List()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
nodeMap := make(map[string]*api.Node)
|
||||||
|
for _, node := range nodes.Items {
|
||||||
|
nodeMap[node.Name] = &node
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create nodes which have been created in cloud, but not in kubernetes cluster.
|
||||||
|
for _, node := range matches.Items {
|
||||||
|
if _, ok := nodeMap[node.Name]; !ok {
|
||||||
|
glog.Infof("Create node in registry: %s", node.Name)
|
||||||
|
_, err = s.kubeClient.Nodes().Create(&node)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Create node error: %s", node.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
delete(nodeMap, node.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete nodes which have been deleted from cloud, but not from kubernetes cluster.
|
||||||
|
for nodeID := range nodeMap {
|
||||||
|
glog.Infof("Delete node from registry: %s", nodeID)
|
||||||
|
err = s.kubeClient.Nodes().Delete(nodeID)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Delete node error: %s", nodeID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SyncNodeStatus synchronizes cluster nodes status to master server.
|
||||||
|
func (s *NodeController) SyncNodeStatus() error {
|
||||||
|
nodes, err := s.kubeClient.Nodes().List()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
oldNodes := make(map[string]api.Node)
|
||||||
|
for _, node := range nodes.Items {
|
||||||
|
oldNodes[node.Name] = node
|
||||||
|
}
|
||||||
|
nodes = s.DoChecks(nodes)
|
||||||
|
for _, node := range nodes.Items {
|
||||||
|
if reflect.DeepEqual(node, oldNodes[node.Name]) {
|
||||||
|
glog.V(2).Infof("skip updating node %v", node.Name)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
node := &api.Node{
|
glog.V(2).Infof("updating node %v", node.Name)
|
||||||
|
_, err = s.kubeClient.Nodes().Update(&node)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("error updating node %s: %v", node.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DoChecks performs health checking for given list of nodes.
|
||||||
|
func (s *NodeController) DoChecks(nodes *api.NodeList) *api.NodeList {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(nodes.Items))
|
||||||
|
for i := range nodes.Items {
|
||||||
|
go func(node *api.Node) {
|
||||||
|
node.Status.Conditions = s.DoCheck(node)
|
||||||
|
wg.Done()
|
||||||
|
}(&nodes.Items[i])
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
return nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
// DoCheck performs health checking for given node.
|
||||||
|
func (s *NodeController) DoCheck(node *api.Node) []api.NodeCondition {
|
||||||
|
var conditions []api.NodeCondition
|
||||||
|
switch status, err := s.kubeletClient.HealthCheck(node.Name); {
|
||||||
|
case err != nil:
|
||||||
|
glog.V(2).Infof("NodeController: node %s health check error: %v", node.Name, err)
|
||||||
|
conditions = append(conditions, api.NodeCondition{
|
||||||
|
Kind: api.NodeReady,
|
||||||
|
Status: api.ConditionUnknown,
|
||||||
|
})
|
||||||
|
case status == probe.Failure:
|
||||||
|
conditions = append(conditions, api.NodeCondition{
|
||||||
|
Kind: api.NodeReady,
|
||||||
|
Status: api.ConditionNone,
|
||||||
|
})
|
||||||
|
default:
|
||||||
|
conditions = append(conditions, api.NodeCondition{
|
||||||
|
Kind: api.NodeReady,
|
||||||
|
Status: api.ConditionFull,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
glog.V(5).Infof("NodeController: node %q status was %+v", node.Name, conditions)
|
||||||
|
return conditions
|
||||||
|
}
|
||||||
|
|
||||||
|
// StaticNodes constructs and returns api.NodeList for static nodes. If error
|
||||||
|
// occurs, an empty NodeList will be returned with a non-nil error info.
|
||||||
|
func (s *NodeController) StaticNodes() (*api.NodeList, error) {
|
||||||
|
result := &api.NodeList{}
|
||||||
|
for _, nodeID := range s.nodes {
|
||||||
|
node := api.Node{
|
||||||
ObjectMeta: api.ObjectMeta{Name: nodeID},
|
ObjectMeta: api.ObjectMeta{Name: nodeID},
|
||||||
Spec: api.NodeSpec{
|
Spec: api.NodeSpec{Capacity: s.staticResources.Capacity},
|
||||||
Capacity: s.staticResources.Capacity,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
addr := net.ParseIP(nodeID)
|
addr := net.ParseIP(nodeID)
|
||||||
if addr != nil {
|
if addr != nil {
|
||||||
@ -93,75 +258,31 @@ func (s *NodeController) SyncStatic(period time.Duration) error {
|
|||||||
node.Status.HostIP = addrs[0].String()
|
node.Status.HostIP = addrs[0].String()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_, err := s.kubeClient.Nodes().Create(node)
|
result.Items = append(result.Items, node)
|
||||||
if err == nil {
|
|
||||||
registered.Insert(nodeID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if registered.Len() == len(s.nodes) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
time.Sleep(period)
|
|
||||||
}
|
}
|
||||||
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SyncCloud syncs list of instances from cloudprovider to master etcd registry.
|
// CloudNodes constructs and returns api.NodeList from cloudprovider. If error
|
||||||
func (s *NodeController) SyncCloud() error {
|
// occurs, an empty NodeList will be returned with a non-nil error info.
|
||||||
matches, err := s.cloudNodes()
|
func (s *NodeController) CloudNodes() (*api.NodeList, error) {
|
||||||
if err != nil {
|
result := &api.NodeList{}
|
||||||
return err
|
|
||||||
}
|
|
||||||
nodes, err := s.kubeClient.Nodes().List()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
nodeMap := make(map[string]*api.Node)
|
|
||||||
for _, node := range nodes.Items {
|
|
||||||
nodeMap[node.Name] = &node
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create or delete nodes from registry.
|
|
||||||
for _, node := range matches.Items {
|
|
||||||
if _, ok := nodeMap[node.Name]; !ok {
|
|
||||||
glog.Infof("Create node in registry: %s", node.Name)
|
|
||||||
_, err = s.kubeClient.Nodes().Create(&node)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Create node error: %s", node.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
delete(nodeMap, node.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
for nodeID := range nodeMap {
|
|
||||||
glog.Infof("Delete node from registry: %s", nodeID)
|
|
||||||
err = s.kubeClient.Nodes().Delete(nodeID)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Delete node error: %s", nodeID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// cloudNodes constructs and returns api.NodeList from cloudprovider.
|
|
||||||
func (s *NodeController) cloudNodes() (*api.NodeList, error) {
|
|
||||||
instances, ok := s.cloud.Instances()
|
instances, ok := s.cloud.Instances()
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("cloud doesn't support instances")
|
return result, fmt.Errorf("cloud doesn't support instances")
|
||||||
}
|
}
|
||||||
matches, err := instances.List(s.matchRE)
|
matches, err := instances.List(s.matchRE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return result, err
|
||||||
}
|
|
||||||
result := &api.NodeList{
|
|
||||||
Items: make([]api.Node, len(matches)),
|
|
||||||
}
|
}
|
||||||
for i := range matches {
|
for i := range matches {
|
||||||
result.Items[i].Name = matches[i]
|
node := api.Node{}
|
||||||
|
node.Name = matches[i]
|
||||||
hostIP, err := instances.IPAddress(matches[i])
|
hostIP, err := instances.IPAddress(matches[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("error getting instance ip address for %s: %v", matches[i], err)
|
glog.Errorf("error getting instance ip address for %s: %v", matches[i], err)
|
||||||
} else {
|
} else {
|
||||||
result.Items[i].Status.HostIP = hostIP.String()
|
node.Status.HostIP = hostIP.String()
|
||||||
}
|
}
|
||||||
resources, err := instances.GetNodeResources(matches[i])
|
resources, err := instances.GetNodeResources(matches[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -171,8 +292,14 @@ func (s *NodeController) cloudNodes() (*api.NodeList, error) {
|
|||||||
resources = s.staticResources
|
resources = s.staticResources
|
||||||
}
|
}
|
||||||
if resources != nil {
|
if resources != nil {
|
||||||
result.Items[i].Spec.Capacity = resources.Capacity
|
node.Spec.Capacity = resources.Capacity
|
||||||
}
|
}
|
||||||
|
result.Items = append(result.Items, node)
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isRunningCloudProvider checks if cluster is running with cloud provider.
|
||||||
|
func (s *NodeController) isRunningCloudProvider() bool {
|
||||||
|
return s.cloud != nil && len(s.matchRE) > 0
|
||||||
|
}
|
||||||
|
@ -17,19 +17,22 @@ limitations under the License.
|
|||||||
package controller
|
package controller
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"reflect"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
|
fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newNode(name string) *api.Node {
|
// FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface.
|
||||||
return &api.Node{ObjectMeta: api.ObjectMeta{Name: name}}
|
|
||||||
}
|
|
||||||
|
|
||||||
type FakeNodeHandler struct {
|
type FakeNodeHandler struct {
|
||||||
client.Fake
|
client.Fake
|
||||||
client.FakeNodes
|
client.FakeNodes
|
||||||
@ -41,6 +44,7 @@ type FakeNodeHandler struct {
|
|||||||
// Output
|
// Output
|
||||||
CreatedNodes []*api.Node
|
CreatedNodes []*api.Node
|
||||||
DeletedNodes []*api.Node
|
DeletedNodes []*api.Node
|
||||||
|
UpdatedNodes []*api.Node
|
||||||
RequestCount int
|
RequestCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -51,7 +55,8 @@ func (c *FakeNodeHandler) Nodes() client.NodeInterface {
|
|||||||
func (m *FakeNodeHandler) Create(node *api.Node) (*api.Node, error) {
|
func (m *FakeNodeHandler) Create(node *api.Node) (*api.Node, error) {
|
||||||
defer func() { m.RequestCount++ }()
|
defer func() { m.RequestCount++ }()
|
||||||
if m.CreateHook == nil || m.CreateHook(m, node) {
|
if m.CreateHook == nil || m.CreateHook(m, node) {
|
||||||
m.CreatedNodes = append(m.CreatedNodes, node)
|
nodeCopy := *node
|
||||||
|
m.CreatedNodes = append(m.CreatedNodes, &nodeCopy)
|
||||||
return node, nil
|
return node, nil
|
||||||
} else {
|
} else {
|
||||||
return nil, fmt.Errorf("Create error.")
|
return nil, fmt.Errorf("Create error.")
|
||||||
@ -60,18 +65,27 @@ func (m *FakeNodeHandler) Create(node *api.Node) (*api.Node, error) {
|
|||||||
|
|
||||||
func (m *FakeNodeHandler) List() (*api.NodeList, error) {
|
func (m *FakeNodeHandler) List() (*api.NodeList, error) {
|
||||||
defer func() { m.RequestCount++ }()
|
defer func() { m.RequestCount++ }()
|
||||||
nodes := []api.Node{}
|
var nodes []*api.Node
|
||||||
|
for i := 0; i < len(m.UpdatedNodes); i++ {
|
||||||
|
if !contains(m.UpdatedNodes[i], m.DeletedNodes) {
|
||||||
|
nodes = append(nodes, m.UpdatedNodes[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
for i := 0; i < len(m.Existing); i++ {
|
for i := 0; i < len(m.Existing); i++ {
|
||||||
if !contains(m.Existing[i], m.DeletedNodes) {
|
if !contains(m.Existing[i], m.DeletedNodes) && !contains(m.Existing[i], nodes) {
|
||||||
nodes = append(nodes, *m.Existing[i])
|
nodes = append(nodes, m.Existing[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for i := 0; i < len(m.CreatedNodes); i++ {
|
for i := 0; i < len(m.CreatedNodes); i++ {
|
||||||
if !contains(m.Existing[i], m.DeletedNodes) {
|
if !contains(m.Existing[i], m.DeletedNodes) && !contains(m.CreatedNodes[i], nodes) {
|
||||||
nodes = append(nodes, *m.CreatedNodes[i])
|
nodes = append(nodes, m.CreatedNodes[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &api.NodeList{Items: nodes}, nil
|
nodeList := &api.NodeList{}
|
||||||
|
for _, node := range nodes {
|
||||||
|
nodeList.Items = append(nodeList.Items, *node)
|
||||||
|
}
|
||||||
|
return nodeList, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *FakeNodeHandler) Delete(id string) error {
|
func (m *FakeNodeHandler) Delete(id string) error {
|
||||||
@ -80,142 +94,377 @@ func (m *FakeNodeHandler) Delete(id string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSyncStaticCreateNode(t *testing.T) {
|
func (m *FakeNodeHandler) Update(node *api.Node) (*api.Node, error) {
|
||||||
fakeNodeHandler := &FakeNodeHandler{
|
nodeCopy := *node
|
||||||
CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool {
|
m.UpdatedNodes = append(m.UpdatedNodes, &nodeCopy)
|
||||||
return true
|
m.RequestCount++
|
||||||
},
|
return node, nil
|
||||||
}
|
|
||||||
nodeController := NewNodeController(nil, ".*", []string{"node0"}, &api.NodeResources{}, fakeNodeHandler)
|
|
||||||
if err := nodeController.SyncStatic(time.Millisecond); err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if fakeNodeHandler.RequestCount != 1 {
|
|
||||||
t.Errorf("Expected 1 call, but got %v.", fakeNodeHandler.RequestCount)
|
|
||||||
}
|
|
||||||
if len(fakeNodeHandler.CreatedNodes) != 1 {
|
|
||||||
t.Errorf("expect only 1 node created, got %v", len(fakeNodeHandler.CreatedNodes))
|
|
||||||
}
|
|
||||||
if fakeNodeHandler.CreatedNodes[0].Name != "node0" {
|
|
||||||
t.Errorf("unexpect node %v created", fakeNodeHandler.CreatedNodes[0].Name)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSyncStaticCreateNodeWithHostIP(t *testing.T) {
|
// FakeKubeletClient is a fake implementation of KubeletClient.
|
||||||
fakeNodeHandler := &FakeNodeHandler{
|
type FakeKubeletClient struct {
|
||||||
CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool {
|
Status probe.Status
|
||||||
return true
|
Err error
|
||||||
},
|
|
||||||
}
|
|
||||||
nodeController := NewNodeController(nil, ".*", []string{"10.0.0.1"}, &api.NodeResources{}, fakeNodeHandler)
|
|
||||||
if err := nodeController.SyncStatic(time.Millisecond); err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if fakeNodeHandler.CreatedNodes[0].Name != "10.0.0.1" {
|
|
||||||
t.Errorf("unexpect node %v created", fakeNodeHandler.CreatedNodes[0].Name)
|
|
||||||
}
|
|
||||||
if fakeNodeHandler.CreatedNodes[0].Status.HostIP != "10.0.0.1" {
|
|
||||||
t.Errorf("unexpect nil node HostIP for node %v", fakeNodeHandler.CreatedNodes[0].Name)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSyncStaticCreateNodeWithError(t *testing.T) {
|
func (c *FakeKubeletClient) GetPodStatus(host, podNamespace, podID string) (api.PodStatusResult, error) {
|
||||||
fakeNodeHandler := &FakeNodeHandler{
|
return api.PodStatusResult{}, errors.New("Not Implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *FakeKubeletClient) HealthCheck(host string) (probe.Status, error) {
|
||||||
|
return c.Status, c.Err
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegisterNodes(t *testing.T) {
|
||||||
|
table := []struct {
|
||||||
|
fakeNodeHandler *FakeNodeHandler
|
||||||
|
machines []string
|
||||||
|
retryCount int
|
||||||
|
expectedRequestCount int
|
||||||
|
expectedCreateCount int
|
||||||
|
expectedFail bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
// Register two nodes normally.
|
||||||
|
machines: []string{"node0", "node1"},
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
|
CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { return true },
|
||||||
|
},
|
||||||
|
retryCount: 1,
|
||||||
|
expectedRequestCount: 2,
|
||||||
|
expectedCreateCount: 2,
|
||||||
|
expectedFail: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// No machine to register.
|
||||||
|
machines: []string{},
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
|
CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { return true },
|
||||||
|
},
|
||||||
|
retryCount: 1,
|
||||||
|
expectedRequestCount: 0,
|
||||||
|
expectedCreateCount: 0,
|
||||||
|
expectedFail: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// Fail the first two requests.
|
||||||
|
machines: []string{"node0", "node1"},
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool {
|
CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool {
|
||||||
if fake.RequestCount == 0 {
|
if fake.RequestCount == 0 || fake.RequestCount == 1 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
},
|
},
|
||||||
|
},
|
||||||
|
retryCount: 10,
|
||||||
|
expectedRequestCount: 4,
|
||||||
|
expectedCreateCount: 2,
|
||||||
|
expectedFail: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// The first node always fails.
|
||||||
|
machines: []string{"node0", "node1"},
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
|
CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool {
|
||||||
|
if node.Name == "node0" {
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
nodeController := NewNodeController(nil, ".*", []string{"node0"}, &api.NodeResources{}, fakeNodeHandler)
|
return true
|
||||||
if err := nodeController.SyncStatic(time.Millisecond); err != nil {
|
},
|
||||||
t.Errorf("unexpected error: %v", err)
|
},
|
||||||
|
retryCount: 2,
|
||||||
|
expectedRequestCount: 3, // 2 for node0, 1 for node1
|
||||||
|
expectedCreateCount: 1,
|
||||||
|
expectedFail: true,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
if fakeNodeHandler.RequestCount != 2 {
|
for _, item := range table {
|
||||||
t.Errorf("Expected 2 call, but got %v.", fakeNodeHandler.RequestCount)
|
nodes := api.NodeList{}
|
||||||
|
for _, machine := range item.machines {
|
||||||
|
nodes.Items = append(nodes.Items, *newNode(machine))
|
||||||
}
|
}
|
||||||
if len(fakeNodeHandler.CreatedNodes) != 1 {
|
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil)
|
||||||
t.Errorf("expect only 1 node created, got %v", len(fakeNodeHandler.CreatedNodes))
|
err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond)
|
||||||
|
if !item.expectedFail && err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if item.expectedFail && err == nil {
|
||||||
|
t.Errorf("unexpected non-error")
|
||||||
|
}
|
||||||
|
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
|
||||||
|
t.Errorf("expected %v calls, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
|
||||||
|
}
|
||||||
|
if len(item.fakeNodeHandler.CreatedNodes) != item.expectedCreateCount {
|
||||||
|
t.Errorf("expected %v nodes, but got %v.", item.expectedCreateCount, item.fakeNodeHandler.CreatedNodes)
|
||||||
}
|
}
|
||||||
if fakeNodeHandler.CreatedNodes[0].Name != "node0" {
|
|
||||||
t.Errorf("unexpect node %v created", fakeNodeHandler.CreatedNodes[0].Name)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSyncCloudCreateNode(t *testing.T) {
|
func TestCreateStaticNodes(t *testing.T) {
|
||||||
fakeNodeHandler := &FakeNodeHandler{
|
table := []struct {
|
||||||
|
machines []string
|
||||||
|
expectedNodes *api.NodeList
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
machines: []string{},
|
||||||
|
expectedNodes: &api.NodeList{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
machines: []string{"node0"},
|
||||||
|
expectedNodes: &api.NodeList{
|
||||||
|
Items: []api.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{Name: "node0"},
|
||||||
|
Spec: api.NodeSpec{},
|
||||||
|
Status: api.NodeStatus{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, item := range table {
|
||||||
|
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, nil, nil)
|
||||||
|
nodes, err := nodeController.StaticNodes()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(item.expectedNodes, nodes) {
|
||||||
|
t.Errorf("expected node list %+v, got %+v", item.expectedNodes, nodes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCreateCloudNodes(t *testing.T) {
|
||||||
|
resourceList := api.ResourceList{
|
||||||
|
api.ResourceCPU: *resource.NewMilliQuantity(1000, resource.DecimalSI),
|
||||||
|
api.ResourceMemory: *resource.NewQuantity(3000, resource.DecimalSI),
|
||||||
|
}
|
||||||
|
|
||||||
|
table := []struct {
|
||||||
|
fakeCloud *fake_cloud.FakeCloud
|
||||||
|
machines []string
|
||||||
|
expectedNodes *api.NodeList
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
fakeCloud: &fake_cloud.FakeCloud{},
|
||||||
|
expectedNodes: &api.NodeList{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
fakeCloud: &fake_cloud.FakeCloud{
|
||||||
|
Machines: []string{"node0"},
|
||||||
|
IP: net.ParseIP("1.2.3.4"),
|
||||||
|
NodeResources: &api.NodeResources{Capacity: resourceList},
|
||||||
|
},
|
||||||
|
expectedNodes: &api.NodeList{
|
||||||
|
Items: []api.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{Name: "node0"},
|
||||||
|
Spec: api.NodeSpec{Capacity: resourceList},
|
||||||
|
Status: api.NodeStatus{HostIP: "1.2.3.4"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, item := range table {
|
||||||
|
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil)
|
||||||
|
nodes, err := nodeController.CloudNodes()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(item.expectedNodes, nodes) {
|
||||||
|
t.Errorf("expected node list %+v, got %+v", item.expectedNodes, nodes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyncCloud(t *testing.T) {
|
||||||
|
table := []struct {
|
||||||
|
fakeNodeHandler *FakeNodeHandler
|
||||||
|
fakeCloud *fake_cloud.FakeCloud
|
||||||
|
matchRE string
|
||||||
|
expectedRequestCount int
|
||||||
|
expectedCreated []string
|
||||||
|
expectedDeleted []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
Existing: []*api.Node{newNode("node0")},
|
Existing: []*api.Node{newNode("node0")},
|
||||||
}
|
},
|
||||||
instances := []string{"node0", "node1"}
|
fakeCloud: &fake_cloud.FakeCloud{
|
||||||
fakeCloud := fake_cloud.FakeCloud{
|
Machines: []string{"node0", "node1"},
|
||||||
Machines: instances,
|
},
|
||||||
}
|
matchRE: ".*",
|
||||||
nodeController := NewNodeController(&fakeCloud, ".*", nil, nil, fakeNodeHandler)
|
expectedRequestCount: 2, // List + Create
|
||||||
if err := nodeController.SyncCloud(); err != nil {
|
expectedCreated: []string{"node1"},
|
||||||
t.Errorf("unexpected error: %v", err)
|
expectedDeleted: []string{},
|
||||||
}
|
},
|
||||||
|
{
|
||||||
if fakeNodeHandler.RequestCount != 2 {
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
t.Errorf("Expected 2 call, but got %v.", fakeNodeHandler.RequestCount)
|
|
||||||
}
|
|
||||||
if len(fakeNodeHandler.CreatedNodes) != 1 {
|
|
||||||
t.Errorf("expect only 1 node created, got %v", len(fakeNodeHandler.CreatedNodes))
|
|
||||||
}
|
|
||||||
if fakeNodeHandler.CreatedNodes[0].Name != "node1" {
|
|
||||||
t.Errorf("unexpect node %v created", fakeNodeHandler.CreatedNodes[0].Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSyncCloudDeleteNode(t *testing.T) {
|
|
||||||
fakeNodeHandler := &FakeNodeHandler{
|
|
||||||
Existing: []*api.Node{newNode("node0"), newNode("node1")},
|
Existing: []*api.Node{newNode("node0"), newNode("node1")},
|
||||||
|
},
|
||||||
|
fakeCloud: &fake_cloud.FakeCloud{
|
||||||
|
Machines: []string{"node0"},
|
||||||
|
},
|
||||||
|
matchRE: ".*",
|
||||||
|
expectedRequestCount: 2, // List + Delete
|
||||||
|
expectedCreated: []string{},
|
||||||
|
expectedDeleted: []string{"node1"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
|
Existing: []*api.Node{newNode("node0")},
|
||||||
|
},
|
||||||
|
fakeCloud: &fake_cloud.FakeCloud{
|
||||||
|
Machines: []string{"node0", "node1", "fake"},
|
||||||
|
},
|
||||||
|
matchRE: "node[0-9]+",
|
||||||
|
expectedRequestCount: 2, // List + Create
|
||||||
|
expectedCreated: []string{"node1"},
|
||||||
|
expectedDeleted: []string{},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
instances := []string{"node0"}
|
|
||||||
fakeCloud := fake_cloud.FakeCloud{
|
for _, item := range table {
|
||||||
Machines: instances,
|
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil)
|
||||||
}
|
|
||||||
nodeController := NewNodeController(&fakeCloud, ".*", nil, nil, fakeNodeHandler)
|
|
||||||
if err := nodeController.SyncCloud(); err != nil {
|
if err := nodeController.SyncCloud(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
|
||||||
if fakeNodeHandler.RequestCount != 2 {
|
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
|
||||||
t.Errorf("Expected 2 call, but got %v.", fakeNodeHandler.RequestCount)
|
|
||||||
}
|
}
|
||||||
if len(fakeNodeHandler.DeletedNodes) != 1 {
|
nodes := sortedNodeNames(item.fakeNodeHandler.CreatedNodes)
|
||||||
t.Errorf("expect only 1 node deleted, got %v", len(fakeNodeHandler.DeletedNodes))
|
if !reflect.DeepEqual(item.expectedCreated, nodes) {
|
||||||
|
t.Errorf("expected node list %+v, got %+v", item.expectedCreated, nodes)
|
||||||
|
}
|
||||||
|
nodes = sortedNodeNames(item.fakeNodeHandler.DeletedNodes)
|
||||||
|
if !reflect.DeepEqual(item.expectedDeleted, nodes) {
|
||||||
|
t.Errorf("expected node list %+v, got %+v", item.expectedDeleted, nodes)
|
||||||
}
|
}
|
||||||
if fakeNodeHandler.DeletedNodes[0].Name != "node1" {
|
|
||||||
t.Errorf("unexpect node %v created", fakeNodeHandler.DeletedNodes[0].Name)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSyncCloudRegexp(t *testing.T) {
|
func TestHealthCheckNode(t *testing.T) {
|
||||||
fakeNodeHandler := &FakeNodeHandler{
|
table := []struct {
|
||||||
Existing: []*api.Node{newNode("node0")},
|
node *api.Node
|
||||||
}
|
fakeKubeletClient *FakeKubeletClient
|
||||||
instances := []string{"node0", "node1", "fake"}
|
expectedConditions []api.NodeCondition
|
||||||
fakeCloud := fake_cloud.FakeCloud{
|
}{
|
||||||
Machines: instances,
|
{
|
||||||
}
|
node: newNode("node0"),
|
||||||
nodeController := NewNodeController(&fakeCloud, "node[0-9]+", nil, nil, fakeNodeHandler)
|
fakeKubeletClient: &FakeKubeletClient{
|
||||||
if err := nodeController.SyncCloud(); err != nil {
|
Status: probe.Success,
|
||||||
t.Errorf("unexpected error: %v", err)
|
Err: nil,
|
||||||
|
},
|
||||||
|
expectedConditions: []api.NodeCondition{
|
||||||
|
{
|
||||||
|
Kind: api.NodeReady,
|
||||||
|
Status: api.ConditionFull,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
node: newNode("node0"),
|
||||||
|
fakeKubeletClient: &FakeKubeletClient{
|
||||||
|
Status: probe.Failure,
|
||||||
|
Err: nil,
|
||||||
|
},
|
||||||
|
expectedConditions: []api.NodeCondition{
|
||||||
|
{
|
||||||
|
Kind: api.NodeReady,
|
||||||
|
Status: api.ConditionNone,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
node: newNode("node1"),
|
||||||
|
fakeKubeletClient: &FakeKubeletClient{
|
||||||
|
Status: probe.Failure,
|
||||||
|
Err: errors.New("Error"),
|
||||||
|
},
|
||||||
|
expectedConditions: []api.NodeCondition{
|
||||||
|
{
|
||||||
|
Kind: api.NodeReady,
|
||||||
|
Status: api.ConditionUnknown,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
if fakeNodeHandler.RequestCount != 2 {
|
for _, item := range table {
|
||||||
t.Errorf("Expected 2 call, but got %v.", fakeNodeHandler.RequestCount)
|
nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient)
|
||||||
|
conditions := nodeController.DoCheck(item.node)
|
||||||
|
if !reflect.DeepEqual(item.expectedConditions, conditions) {
|
||||||
|
t.Errorf("expected conditions %+v, got %+v", item.expectedConditions, conditions)
|
||||||
}
|
}
|
||||||
if len(fakeNodeHandler.CreatedNodes) != 1 {
|
|
||||||
t.Errorf("expect only 1 node created, got %v", len(fakeNodeHandler.CreatedNodes))
|
|
||||||
}
|
}
|
||||||
if fakeNodeHandler.CreatedNodes[0].Name != "node1" {
|
}
|
||||||
t.Errorf("unexpect node %v created", fakeNodeHandler.CreatedNodes[0].Name)
|
|
||||||
|
func TestSyncNodeStatus(t *testing.T) {
|
||||||
|
table := []struct {
|
||||||
|
fakeNodeHandler *FakeNodeHandler
|
||||||
|
fakeKubeletClient *FakeKubeletClient
|
||||||
|
expectedNodes []*api.Node
|
||||||
|
expectedRequestCount int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
|
Existing: []*api.Node{newNode("node0"), newNode("node1")},
|
||||||
|
},
|
||||||
|
fakeKubeletClient: &FakeKubeletClient{
|
||||||
|
Status: probe.Success,
|
||||||
|
Err: nil,
|
||||||
|
},
|
||||||
|
expectedNodes: []*api.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{Name: "node0"},
|
||||||
|
Status: api.NodeStatus{Conditions: []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionFull}}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{Name: "node1"},
|
||||||
|
Status: api.NodeStatus{Conditions: []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionFull}}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedRequestCount: 3, // List + 2xUpdate
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, item := range table {
|
||||||
|
nodeController := NewNodeController(nil, "", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient)
|
||||||
|
if err := nodeController.SyncNodeStatus(); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
|
||||||
|
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) {
|
||||||
|
t.Errorf("expected nodes %+v, got %+v", item.expectedNodes, item.fakeNodeHandler.UpdatedNodes)
|
||||||
|
}
|
||||||
|
item.fakeNodeHandler.RequestCount = 0
|
||||||
|
if err := nodeController.SyncNodeStatus(); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if item.fakeNodeHandler.RequestCount != 1 {
|
||||||
|
t.Errorf("expected one list for updating same status, but got %v.", item.fakeNodeHandler.RequestCount)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newNode(name string) *api.Node {
|
||||||
|
return &api.Node{ObjectMeta: api.ObjectMeta{Name: name}}
|
||||||
|
}
|
||||||
|
|
||||||
|
func sortedNodeNames(nodes []*api.Node) []string {
|
||||||
|
nodeNames := []string{}
|
||||||
|
for _, node := range nodes {
|
||||||
|
nodeNames = append(nodeNames, node.Name)
|
||||||
|
}
|
||||||
|
sort.Strings(nodeNames)
|
||||||
|
return nodeNames
|
||||||
}
|
}
|
||||||
|
|
||||||
func contains(node *api.Node, nodes []*api.Node) bool {
|
func contains(node *api.Node, nodes []*api.Node) bool {
|
||||||
|
@ -68,7 +68,6 @@ type Config struct {
|
|||||||
Client *client.Client
|
Client *client.Client
|
||||||
Cloud cloudprovider.Interface
|
Cloud cloudprovider.Interface
|
||||||
EtcdHelper tools.EtcdHelper
|
EtcdHelper tools.EtcdHelper
|
||||||
HealthCheckMinions bool
|
|
||||||
EventTTL time.Duration
|
EventTTL time.Duration
|
||||||
MinionRegexp string
|
MinionRegexp string
|
||||||
KubeletClient client.KubeletClient
|
KubeletClient client.KubeletClient
|
||||||
@ -235,7 +234,7 @@ func setDefaults(c *Config) {
|
|||||||
// any unhandled paths to "Handler".
|
// any unhandled paths to "Handler".
|
||||||
func New(c *Config) *Master {
|
func New(c *Config) *Master {
|
||||||
setDefaults(c)
|
setDefaults(c)
|
||||||
minionRegistry := makeMinionRegistry(c)
|
minionRegistry := etcd.NewRegistry(c.EtcdHelper, nil)
|
||||||
serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil)
|
serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil)
|
||||||
boundPodFactory := &pod.BasicBoundPodFactory{
|
boundPodFactory := &pod.BasicBoundPodFactory{
|
||||||
ServiceRegistry: serviceRegistry,
|
ServiceRegistry: serviceRegistry,
|
||||||
@ -330,15 +329,6 @@ func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter)
|
|||||||
glog.Errorln(buffer.String())
|
glog.Errorln(buffer.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeMinionRegistry(c *Config) minion.Registry {
|
|
||||||
var minionRegistry minion.Registry = etcd.NewRegistry(c.EtcdHelper, nil)
|
|
||||||
// TODO: plumb in nodeIPCache here
|
|
||||||
if c.HealthCheckMinions {
|
|
||||||
minionRegistry = minion.NewHealthyRegistry(minionRegistry, c.KubeletClient, util.RealClock{}, 20*time.Second)
|
|
||||||
}
|
|
||||||
return minionRegistry
|
|
||||||
}
|
|
||||||
|
|
||||||
// init initializes master.
|
// init initializes master.
|
||||||
func (m *Master) init(c *Config) {
|
func (m *Master) init(c *Config) {
|
||||||
var userContexts = handlers.NewUserRequestContext()
|
var userContexts = handlers.NewUserRequestContext()
|
||||||
|
@ -45,8 +45,8 @@ type HTTPGetInterface interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DoHTTPProbe checks if a GET request to the url succeeds.
|
// DoHTTPProbe checks if a GET request to the url succeeds.
|
||||||
// If the HTTP response code is successful (i.e. 400 > code >= 200), it returns Healthy.
|
// If the HTTP response code is successful (i.e. 400 > code >= 200), it returns Success.
|
||||||
// If the HTTP response code is unsuccessful or HTTP communication fails, it returns Unhealthy.
|
// If the HTTP response code is unsuccessful or HTTP communication fails, it returns Failure.
|
||||||
// This is exported because some other packages may want to do direct HTTP probes.
|
// This is exported because some other packages may want to do direct HTTP probes.
|
||||||
func DoHTTPProbe(url string, client HTTPGetInterface) (probe.Status, error) {
|
func DoHTTPProbe(url string, client HTTPGetInterface) (probe.Status, error) {
|
||||||
res, err := client.Get(url)
|
res, err := client.Get(url)
|
||||||
|
@ -36,8 +36,8 @@ func (pr TCPProber) Probe(host string, port int) (probe.Status, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DoTCPProbe checks that a TCP socket to the address can be opened.
|
// DoTCPProbe checks that a TCP socket to the address can be opened.
|
||||||
// If the socket can be opened, it returns Healthy.
|
// If the socket can be opened, it returns Success
|
||||||
// If the socket fails to open, it returns Unhealthy.
|
// If the socket fails to open, it returns Failure.
|
||||||
// This is exported because some other packages may want to do direct TCP probes.
|
// This is exported because some other packages may want to do direct TCP probes.
|
||||||
func DoTCPProbe(addr string) (probe.Status, error) {
|
func DoTCPProbe(addr string) (probe.Status, error) {
|
||||||
conn, err := net.Dial("tcp", addr)
|
conn, err := net.Dial("tcp", addr)
|
||||||
|
@ -1,125 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2014 Google Inc. All rights reserved.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package minion
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
|
||||||
|
|
||||||
"github.com/golang/glog"
|
|
||||||
)
|
|
||||||
|
|
||||||
type HealthyRegistry struct {
|
|
||||||
delegate Registry
|
|
||||||
client client.KubeletHealthChecker
|
|
||||||
cache util.TimeCache
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewHealthyRegistry(delegate Registry, client client.KubeletHealthChecker, clock util.Clock, ttl time.Duration) Registry {
|
|
||||||
h := &HealthyRegistry{
|
|
||||||
delegate: delegate,
|
|
||||||
client: client,
|
|
||||||
}
|
|
||||||
h.cache = util.NewTimeCache(clock, ttl, h.doCheck)
|
|
||||||
return h
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *HealthyRegistry) GetMinion(ctx api.Context, minionID string) (*api.Node, error) {
|
|
||||||
minion, err := r.delegate.GetMinion(ctx, minionID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return r.checkMinion(minion), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *HealthyRegistry) DeleteMinion(ctx api.Context, minionID string) error {
|
|
||||||
return r.delegate.DeleteMinion(ctx, minionID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *HealthyRegistry) CreateMinion(ctx api.Context, minion *api.Node) error {
|
|
||||||
return r.delegate.CreateMinion(ctx, minion)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *HealthyRegistry) UpdateMinion(ctx api.Context, minion *api.Node) error {
|
|
||||||
return r.delegate.UpdateMinion(ctx, minion)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *HealthyRegistry) ListMinions(ctx api.Context) (currentMinions *api.NodeList, err error) {
|
|
||||||
list, err := r.delegate.ListMinions(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// In case the cache is empty, health check in parallel instead of serially.
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(len(list.Items))
|
|
||||||
for i := range list.Items {
|
|
||||||
go func(i int) {
|
|
||||||
list.Items[i] = *r.checkMinion(&list.Items[i])
|
|
||||||
wg.Done()
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
return list, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *HealthyRegistry) WatchMinions(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
|
|
||||||
w, err := r.delegate.WatchMinions(ctx, label, field, resourceVersion)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return watch.Filter(w, watch.FilterFunc(func(in watch.Event) (watch.Event, bool) {
|
|
||||||
if node, ok := in.Object.(*api.Node); ok && node != nil {
|
|
||||||
in.Object = r.checkMinion(node)
|
|
||||||
}
|
|
||||||
return in, true
|
|
||||||
})), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *HealthyRegistry) checkMinion(node *api.Node) *api.Node {
|
|
||||||
condition := r.cache.Get(node.Name).(api.NodeConditionStatus)
|
|
||||||
// TODO: distinguish other conditions like Reachable/Live, and begin storing this
|
|
||||||
// data on nodes directly via sync loops.
|
|
||||||
node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{
|
|
||||||
Kind: api.NodeReady,
|
|
||||||
Status: condition,
|
|
||||||
})
|
|
||||||
return node
|
|
||||||
}
|
|
||||||
|
|
||||||
// This is called to fill the cache.
|
|
||||||
func (r *HealthyRegistry) doCheck(key string) util.T {
|
|
||||||
var nodeStatus api.NodeConditionStatus
|
|
||||||
switch status, err := r.client.HealthCheck(key); {
|
|
||||||
case err != nil:
|
|
||||||
glog.V(2).Infof("HealthyRegistry: node %q health check error: %v", key, err)
|
|
||||||
nodeStatus = api.ConditionUnknown
|
|
||||||
case status == probe.Failure:
|
|
||||||
nodeStatus = api.ConditionNone
|
|
||||||
default:
|
|
||||||
nodeStatus = api.ConditionFull
|
|
||||||
}
|
|
||||||
glog.V(3).Infof("HealthyRegistry: node %q status was %q", key, nodeStatus)
|
|
||||||
return nodeStatus
|
|
||||||
}
|
|
@ -1,114 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2014 Google Inc. All rights reserved.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package minion
|
|
||||||
|
|
||||||
import (
|
|
||||||
"reflect"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
type alwaysYes struct{}
|
|
||||||
|
|
||||||
func (alwaysYes) HealthCheck(host string) (probe.Status, error) {
|
|
||||||
return probe.Success, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBasicDelegation(t *testing.T) {
|
|
||||||
ctx := api.NewContext()
|
|
||||||
mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}, api.NodeResources{})
|
|
||||||
healthy := NewHealthyRegistry(
|
|
||||||
mockMinionRegistry,
|
|
||||||
alwaysYes{},
|
|
||||||
&util.FakeClock{},
|
|
||||||
60*time.Second,
|
|
||||||
)
|
|
||||||
list, err := healthy.ListMinions(ctx)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(list, &mockMinionRegistry.Minions) {
|
|
||||||
t.Errorf("Expected %v, Got %v", mockMinionRegistry.Minions, list)
|
|
||||||
}
|
|
||||||
err = healthy.CreateMinion(ctx, &api.Node{
|
|
||||||
ObjectMeta: api.ObjectMeta{Name: "foo"},
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
minion, err := healthy.GetMinion(ctx, "m1")
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
if minion == nil {
|
|
||||||
t.Errorf("Unexpected absence of 'm1'")
|
|
||||||
}
|
|
||||||
minion, err = healthy.GetMinion(ctx, "m5")
|
|
||||||
if err == nil {
|
|
||||||
t.Errorf("unexpected non-error")
|
|
||||||
}
|
|
||||||
if minion != nil {
|
|
||||||
t.Errorf("Unexpected presence of 'm5'")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type notMinion struct {
|
|
||||||
minion string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *notMinion) HealthCheck(host string) (probe.Status, error) {
|
|
||||||
if host != n.minion {
|
|
||||||
return probe.Success, nil
|
|
||||||
} else {
|
|
||||||
return probe.Failure, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFiltering(t *testing.T) {
|
|
||||||
ctx := api.NewContext()
|
|
||||||
mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}, api.NodeResources{})
|
|
||||||
healthy := NewHealthyRegistry(
|
|
||||||
mockMinionRegistry,
|
|
||||||
¬Minion{minion: "m1"},
|
|
||||||
&util.FakeClock{},
|
|
||||||
60*time.Second,
|
|
||||||
)
|
|
||||||
expected := []string{"m1", "m2", "m3"}
|
|
||||||
list, err := healthy.ListMinions(ctx)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
expectedMinions := registrytest.MakeMinionList(expected, api.NodeResources{})
|
|
||||||
expectedMinions.Items[0].Status.Conditions = []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionNone}}
|
|
||||||
expectedMinions.Items[1].Status.Conditions = []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionFull}}
|
|
||||||
expectedMinions.Items[2].Status.Conditions = []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionFull}}
|
|
||||||
if !reflect.DeepEqual(list, expectedMinions) {
|
|
||||||
t.Errorf("Expected %v, Got %v", expected, list)
|
|
||||||
}
|
|
||||||
minion, err := healthy.GetMinion(ctx, "m1")
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
if minion == nil {
|
|
||||||
t.Errorf("Unexpected empty 'm1'")
|
|
||||||
}
|
|
||||||
}
|
|
@ -18,13 +18,11 @@ package minion
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMinionRegistryREST(t *testing.T) {
|
func TestMinionRegistryREST(t *testing.T) {
|
||||||
@ -89,57 +87,6 @@ func TestMinionRegistryREST(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMinionRegistryHealthCheck(t *testing.T) {
|
|
||||||
minionRegistry := registrytest.NewMinionRegistry([]string{}, api.NodeResources{})
|
|
||||||
minionHealthRegistry := NewHealthyRegistry(
|
|
||||||
minionRegistry,
|
|
||||||
¬Minion{minion: "m1"},
|
|
||||||
&util.FakeClock{},
|
|
||||||
60*time.Second,
|
|
||||||
)
|
|
||||||
|
|
||||||
ms := NewREST(minionHealthRegistry)
|
|
||||||
ctx := api.NewContext()
|
|
||||||
|
|
||||||
c, err := ms.Create(ctx, &api.Node{ObjectMeta: api.ObjectMeta{Name: "m1"}})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("insert failed: %v", err)
|
|
||||||
}
|
|
||||||
result := <-c
|
|
||||||
if m, ok := result.Object.(*api.Node); !ok || m.Name != "m1" {
|
|
||||||
t.Errorf("insert return value was weird: %#v", result)
|
|
||||||
}
|
|
||||||
if _, err := ms.Get(ctx, "m1"); err != nil {
|
|
||||||
t.Errorf("node is unhealthy, expect no error: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func contains(nodes *api.NodeList, nodeID string) bool {
|
|
||||||
for _, node := range nodes.Items {
|
|
||||||
if node.Name == nodeID {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMinionRegistryInvalidUpdate(t *testing.T) {
|
|
||||||
storage := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}))
|
|
||||||
ctx := api.NewContext()
|
|
||||||
obj, err := storage.Get(ctx, "foo")
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
minion, ok := obj.(*api.Node)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("Object is not a minion: %#v", obj)
|
|
||||||
}
|
|
||||||
minion.Status.HostIP = "1.2.3.4"
|
|
||||||
if _, err = storage.Update(ctx, minion); err == nil {
|
|
||||||
t.Error("Unexpected non-error.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMinionRegistryValidUpdate(t *testing.T) {
|
func TestMinionRegistryValidUpdate(t *testing.T) {
|
||||||
storage := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}))
|
storage := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}))
|
||||||
ctx := api.NewContext()
|
ctx := api.NewContext()
|
||||||
@ -192,3 +139,12 @@ func TestMinionRegistryValidatesCreate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func contains(nodes *api.NodeList, nodeID string) bool {
|
||||||
|
for _, node := range nodes.Items {
|
||||||
|
if node.Name == nodeID {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
@ -36,6 +36,7 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/service"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/service"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
@ -134,8 +135,9 @@ func RunControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
|
|||||||
api.ResourceMemory: *resource.NewQuantity(nodeMemory, resource.BinarySI),
|
api.ResourceMemory: *resource.NewQuantity(nodeMemory, resource.BinarySI),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl)
|
kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort}
|
||||||
nodeController.Run(10 * time.Second)
|
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient)
|
||||||
|
nodeController.Run(10*time.Second, 10)
|
||||||
|
|
||||||
endpoints := service.NewEndpointController(cl)
|
endpoints := service.NewEndpointController(cl)
|
||||||
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
|
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
|
||||||
|
@ -205,9 +205,8 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) {
|
|||||||
nodes.Items = append(nodes.Items, node)
|
nodes.Items = append(nodes.Items, node)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// If no condition is set, either node health check is disabled (master
|
// If no condition is set, we get unknown node condition. In such cases,
|
||||||
// flag "healthCheckMinions" is set to false), or we get unknown condition.
|
// we add nodes unconditionally.
|
||||||
// In such cases, we add nodes unconditionally.
|
|
||||||
nodes.Items = append(nodes.Items, node)
|
nodes.Items = append(nodes.Items, node)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -203,7 +203,7 @@ func getTestRequests() []struct {
|
|||||||
// Normal methods on services
|
// Normal methods on services
|
||||||
{"GET", "/api/v1beta1/services", "", code200},
|
{"GET", "/api/v1beta1/services", "", code200},
|
||||||
{"POST", "/api/v1beta1/services" + timeoutFlag, aService, code200},
|
{"POST", "/api/v1beta1/services" + timeoutFlag, aService, code200},
|
||||||
{"PUT", "/api/v1beta1/services/a" + timeoutFlag, aService, code409}, // TODO: GET and put back server-provided fields to avoid a 422
|
{"PUT", "/api/v1beta1/services/a" + timeoutFlag, aService, code409}, // See #2115 about why 409
|
||||||
{"GET", "/api/v1beta1/services", "", code200},
|
{"GET", "/api/v1beta1/services", "", code200},
|
||||||
{"GET", "/api/v1beta1/services/a", "", code200},
|
{"GET", "/api/v1beta1/services/a", "", code200},
|
||||||
{"DELETE", "/api/v1beta1/services/a" + timeoutFlag, "", code200},
|
{"DELETE", "/api/v1beta1/services/a" + timeoutFlag, "", code200},
|
||||||
@ -227,7 +227,7 @@ func getTestRequests() []struct {
|
|||||||
// Normal methods on minions
|
// Normal methods on minions
|
||||||
{"GET", "/api/v1beta1/minions", "", code200},
|
{"GET", "/api/v1beta1/minions", "", code200},
|
||||||
{"POST", "/api/v1beta1/minions" + timeoutFlag, aMinion, code200},
|
{"POST", "/api/v1beta1/minions" + timeoutFlag, aMinion, code200},
|
||||||
{"PUT", "/api/v1beta1/minions/a" + timeoutFlag, aMinion, code422}, // TODO: GET and put back server-provided fields to avoid a 422
|
{"PUT", "/api/v1beta1/minions/a" + timeoutFlag, aMinion, code409}, // See #2115 about why 409
|
||||||
{"GET", "/api/v1beta1/minions", "", code200},
|
{"GET", "/api/v1beta1/minions", "", code200},
|
||||||
{"GET", "/api/v1beta1/minions/a", "", code200},
|
{"GET", "/api/v1beta1/minions/a", "", code200},
|
||||||
{"DELETE", "/api/v1beta1/minions/a" + timeoutFlag, "", code200},
|
{"DELETE", "/api/v1beta1/minions/a" + timeoutFlag, "", code200},
|
||||||
|
Loading…
Reference in New Issue
Block a user