Move hostip to sync status

This commit is contained in:
Deyuan Deng 2015-01-30 15:50:47 -05:00
parent 6432b563f3
commit 53d44a6f5f
2 changed files with 101 additions and 33 deletions

View File

@ -18,7 +18,6 @@ package controller
import ( import (
"errors" "errors"
"fmt"
"net" "net"
"reflect" "reflect"
"sync" "sync"
@ -34,6 +33,8 @@ import (
var ( var (
ErrRegistration = errors.New("unable to register all nodes.") ErrRegistration = errors.New("unable to register all nodes.")
ErrQueryIPAddress = errors.New("unable to query IP address.")
ErrCloudInstance = errors.New("cloud provider doesn't support instances.")
) )
type NodeController struct { type NodeController struct {
@ -79,12 +80,16 @@ func (s *NodeController) Run(period time.Duration, retryCount int) {
} else { } else {
nodes, err = s.StaticNodes() nodes, err = s.StaticNodes()
if err != nil { if err != nil {
glog.Errorf("Error loading initial static nodes") glog.Errorf("Error loading initial static nodes: %v", err)
} }
} }
nodes = s.DoChecks(nodes) nodes = s.DoChecks(nodes)
if err := s.RegisterNodes(nodes, retryCount, period); err != nil { nodes, err = s.PopulateIPs(nodes)
glog.Errorf("Error registrying node list: %+v", nodes) if err != nil {
glog.Errorf("Error getting nodes ips: %v", err)
}
if err = s.RegisterNodes(nodes, retryCount, period); err != nil {
glog.Errorf("Error registrying node list %+v: %v", nodes, err)
} }
// Start syncing node list from cloudprovider. // Start syncing node list from cloudprovider.
@ -183,6 +188,10 @@ func (s *NodeController) SyncNodeStatus() error {
oldNodes[node.Name] = node oldNodes[node.Name] = node
} }
nodes = s.DoChecks(nodes) nodes = s.DoChecks(nodes)
nodes, err = s.PopulateIPs(nodes)
if err != nil {
return err
}
for _, node := range nodes.Items { for _, node := range nodes.Items {
if reflect.DeepEqual(node, oldNodes[node.Name]) { if reflect.DeepEqual(node, oldNodes[node.Name]) {
glog.V(2).Infof("skip updating node %v", node.Name) glog.V(2).Infof("skip updating node %v", node.Name)
@ -197,6 +206,43 @@ func (s *NodeController) SyncNodeStatus() error {
return nil return nil
} }
// PopulateIPs queries IPs for given list of nodes.
func (s *NodeController) PopulateIPs(nodes *api.NodeList) (*api.NodeList, error) {
if s.isRunningCloudProvider() {
instances, ok := s.cloud.Instances()
if !ok {
return nodes, ErrCloudInstance
}
for i := range nodes.Items {
node := &nodes.Items[i]
hostIP, err := instances.IPAddress(node.Name)
if err != nil {
glog.Errorf("error getting instance ip address for %s: %v", node.Name, err)
} else {
node.Status.HostIP = hostIP.String()
}
}
} else {
for i := range nodes.Items {
node := &nodes.Items[i]
addr := net.ParseIP(node.Name)
if addr != nil {
node.Status.HostIP = node.Name
} else {
addrs, err := net.LookupIP(node.Name)
if err != nil {
glog.Errorf("Can't get ip address of node %s: %v", node.Name, err)
} else if len(addrs) == 0 {
glog.Errorf("No ip address for node %v", node.Name)
} else {
node.Status.HostIP = addrs[0].String()
}
}
}
}
return nodes, nil
}
// DoChecks performs health checking for given list of nodes. // DoChecks performs health checking for given list of nodes.
func (s *NodeController) DoChecks(nodes *api.NodeList) *api.NodeList { func (s *NodeController) DoChecks(nodes *api.NodeList) *api.NodeList {
var wg sync.WaitGroup var wg sync.WaitGroup
@ -245,19 +291,6 @@ func (s *NodeController) StaticNodes() (*api.NodeList, error) {
ObjectMeta: api.ObjectMeta{Name: nodeID}, ObjectMeta: api.ObjectMeta{Name: nodeID},
Spec: api.NodeSpec{Capacity: s.staticResources.Capacity}, Spec: api.NodeSpec{Capacity: s.staticResources.Capacity},
} }
addr := net.ParseIP(nodeID)
if addr != nil {
node.Status.HostIP = nodeID
} else {
addrs, err := net.LookupIP(nodeID)
if err != nil {
glog.Errorf("Can't get ip address of node %v", nodeID)
} else if len(addrs) == 0 {
glog.Errorf("No ip address for node %v", nodeID)
} else {
node.Status.HostIP = addrs[0].String()
}
}
result.Items = append(result.Items, node) result.Items = append(result.Items, node)
} }
return result, nil return result, nil
@ -269,7 +302,7 @@ func (s *NodeController) CloudNodes() (*api.NodeList, error) {
result := &api.NodeList{} result := &api.NodeList{}
instances, ok := s.cloud.Instances() instances, ok := s.cloud.Instances()
if !ok { if !ok {
return result, fmt.Errorf("cloud doesn't support instances") return result, ErrCloudInstance
} }
matches, err := instances.List(s.matchRE) matches, err := instances.List(s.matchRE)
if err != nil { if err != nil {
@ -278,12 +311,6 @@ func (s *NodeController) CloudNodes() (*api.NodeList, error) {
for i := range matches { for i := range matches {
node := api.Node{} node := api.Node{}
node.Name = matches[i] node.Name = matches[i]
hostIP, err := instances.IPAddress(matches[i])
if err != nil {
glog.Errorf("error getting instance ip address for %s: %v", matches[i], err)
} else {
node.Status.HostIP = hostIP.String()
}
resources, err := instances.GetNodeResources(matches[i]) resources, err := instances.GetNodeResources(matches[i])
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -18,7 +18,6 @@ package controller
import ( import (
"errors" "errors"
"fmt"
"net" "net"
"reflect" "reflect"
"sort" "sort"
@ -59,7 +58,7 @@ func (m *FakeNodeHandler) Create(node *api.Node) (*api.Node, error) {
m.CreatedNodes = append(m.CreatedNodes, &nodeCopy) m.CreatedNodes = append(m.CreatedNodes, &nodeCopy)
return node, nil return node, nil
} else { } else {
return nil, fmt.Errorf("Create error.") return nil, errors.New("Create error.")
} }
} }
@ -255,7 +254,6 @@ func TestCreateCloudNodes(t *testing.T) {
{ {
fakeCloud: &fake_cloud.FakeCloud{ fakeCloud: &fake_cloud.FakeCloud{
Machines: []string{"node0"}, Machines: []string{"node0"},
IP: net.ParseIP("1.2.3.4"),
NodeResources: &api.NodeResources{Capacity: resourceList}, NodeResources: &api.NodeResources{Capacity: resourceList},
}, },
expectedNodes: &api.NodeList{ expectedNodes: &api.NodeList{
@ -263,7 +261,6 @@ func TestCreateCloudNodes(t *testing.T) {
{ {
ObjectMeta: api.ObjectMeta{Name: "node0"}, ObjectMeta: api.ObjectMeta{Name: "node0"},
Spec: api.NodeSpec{Capacity: resourceList}, Spec: api.NodeSpec{Capacity: resourceList},
Status: api.NodeStatus{HostIP: "1.2.3.4"},
}, },
}, },
}, },
@ -404,10 +401,45 @@ func TestHealthCheckNode(t *testing.T) {
} }
} }
func TestPopulateNodeIPs(t *testing.T) {
table := []struct {
nodes *api.NodeList
fakeCloud *fake_cloud.FakeCloud
expectedFail bool
expectedIP string
}{
{
nodes: &api.NodeList{Items: []api.Node{*newNode("node0"), *newNode("node1")}},
fakeCloud: &fake_cloud.FakeCloud{IP: net.ParseIP("1.2.3.4")},
expectedIP: "1.2.3.4",
},
{
nodes: &api.NodeList{Items: []api.Node{*newNode("node0"), *newNode("node1")}},
fakeCloud: &fake_cloud.FakeCloud{Err: ErrQueryIPAddress},
expectedIP: "",
},
}
for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil)
result, err := nodeController.PopulateIPs(item.nodes)
// In case of IP querying error, we should continue.
if err != nil {
t.Errorf("unexpected error: %v", err)
}
for _, node := range result.Items {
if node.Status.HostIP != item.expectedIP {
t.Errorf("expect HostIP %s, got %s", item.expectedIP, node.Status.HostIP)
}
}
}
}
func TestSyncNodeStatus(t *testing.T) { func TestSyncNodeStatus(t *testing.T) {
table := []struct { table := []struct {
fakeNodeHandler *FakeNodeHandler fakeNodeHandler *FakeNodeHandler
fakeKubeletClient *FakeKubeletClient fakeKubeletClient *FakeKubeletClient
fakeCloud *fake_cloud.FakeCloud
expectedNodes []*api.Node expectedNodes []*api.Node
expectedRequestCount int expectedRequestCount int
}{ }{
@ -419,14 +451,23 @@ func TestSyncNodeStatus(t *testing.T) {
Status: probe.Success, Status: probe.Success,
Err: nil, Err: nil,
}, },
fakeCloud: &fake_cloud.FakeCloud{
IP: net.ParseIP("1.2.3.4"),
},
expectedNodes: []*api.Node{ expectedNodes: []*api.Node{
{ {
ObjectMeta: api.ObjectMeta{Name: "node0"}, ObjectMeta: api.ObjectMeta{Name: "node0"},
Status: api.NodeStatus{Conditions: []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionFull}}}, Status: api.NodeStatus{
Conditions: []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionFull}},
HostIP: "1.2.3.4",
},
}, },
{ {
ObjectMeta: api.ObjectMeta{Name: "node1"}, ObjectMeta: api.ObjectMeta{Name: "node1"},
Status: api.NodeStatus{Conditions: []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionFull}}}, Status: api.NodeStatus{
Conditions: []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionFull}},
HostIP: "1.2.3.4",
},
}, },
}, },
expectedRequestCount: 3, // List + 2xUpdate expectedRequestCount: 3, // List + 2xUpdate
@ -434,7 +475,7 @@ func TestSyncNodeStatus(t *testing.T) {
} }
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(nil, "", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient) nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient)
if err := nodeController.SyncNodeStatus(); err != nil { if err := nodeController.SyncNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -442,7 +483,7 @@ func TestSyncNodeStatus(t *testing.T) {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
} }
if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) { if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) {
t.Errorf("expected nodes %+v, got %+v", item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0])
} }
item.fakeNodeHandler.RequestCount = 0 item.fakeNodeHandler.RequestCount = 0
if err := nodeController.SyncNodeStatus(); err != nil { if err := nodeController.SyncNodeStatus(); err != nil {