diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index d6aad693fae..6b74e2cd61f 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -221,7 +221,7 @@ func (s *CMServer) Run(_ []string) error { nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList) serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName) - if err := serviceController.Run(); err != nil { + if err := serviceController.Run(s.NodeSyncPeriod); err != nil { glog.Errorf("Failed to start service controller: %v", err) } diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 57e03f254e0..9a50165dcb0 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -130,12 +130,13 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, }, } + const nodeSyncPeriod = 10 * time.Second nodeController := nodecontroller.NewNodeController( nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second, "") - nodeController.Run(10*time.Second, true) + nodeController.Run(nodeSyncPeriod, true) serviceController := servicecontroller.New(nil, cl, "kubernetes") - if err := serviceController.Run(); err != nil { + if err := serviceController.Run(nodeSyncPeriod); err != nil { glog.Warningf("Running without a service controller: %v", err) } diff --git a/pkg/cloudprovider/nodecontroller/nodecontroller.go b/pkg/cloudprovider/nodecontroller/nodecontroller.go index f7921abb80c..fdb4f4916be 100644 --- a/pkg/cloudprovider/nodecontroller/nodecontroller.go +++ b/pkg/cloudprovider/nodecontroller/nodecontroller.go @@ -87,8 +87,6 @@ type NodeController struct { // TODO: Change node status monitor to watch based. nodeMonitorPeriod time.Duration clusterName string - // Should external services be reconciled during syncing cloud nodes, even though the nodes were not changed. - reconcileServices bool // Method for easy mocking in unittest. lookupIP func(host string) ([]net.IP, error) now func() util.Time @@ -221,62 +219,6 @@ func (nc *NodeController) registerNodes(nodes *api.NodeList, retryCount int, ret } } -// reconcileExternalServices updates balancers for external services, so that they will match the nodes given. -// Returns true if something went wrong and we should call reconcile again. -func (nc *NodeController) reconcileExternalServices(nodes *api.NodeList) (shouldRetry bool) { - balancer, ok := nc.cloud.TCPLoadBalancer() - if !ok { - glog.Error("The cloud provider does not support external TCP load balancers.") - return false - } - - zones, ok := nc.cloud.Zones() - if !ok { - glog.Error("The cloud provider does not support zone enumeration.") - return false - } - zone, err := zones.GetZone() - if err != nil { - glog.Errorf("Error while getting zone: %v", err) - return false - } - - hosts := []string{} - for _, node := range nodes.Items { - hosts = append(hosts, node.Name) - } - - services, err := nc.kubeClient.Services(api.NamespaceAll).List(labels.Everything()) - if err != nil { - glog.Errorf("Error while listing services: %v", err) - return true - } - shouldRetry = false - for _, service := range services.Items { - if service.Spec.CreateExternalLoadBalancer { - nonTCPPort := false - for i := range service.Spec.Ports { - if service.Spec.Ports[i].Protocol != api.ProtocolTCP { - nonTCPPort = true - break - } - } - if nonTCPPort { - // TODO: Support UDP here. - glog.Errorf("External load balancers for non TCP services are not currently supported: %v.", service) - continue - } - name := cloudprovider.GetLoadBalancerName(&service) - err := balancer.UpdateTCPLoadBalancer(name, zone.Region, hosts) - if err != nil { - glog.Errorf("External error while updating TCP load balancer: %v.", err) - shouldRetry = true - } - } - } - return shouldRetry -} - // syncCloudNodes synchronizes the list of instances from cloudprovider to master server. func (nc *NodeController) syncCloudNodes() error { matches, err := nc.getCloudNodesWithSpec() @@ -295,7 +237,6 @@ func (nc *NodeController) syncCloudNodes() error { // Create nodes which have been created in cloud, but not in kubernetes cluster // Skip nodes if we hit an error while trying to get their addresses. - nodesChanged := false for _, node := range matches.Items { if _, ok := nodeMap[node.Name]; !ok { glog.V(3).Infof("Querying addresses for new node: %s", node.Name) @@ -313,7 +254,6 @@ func (nc *NodeController) syncCloudNodes() error { if err != nil { glog.Errorf("Create node %s error: %v", node.Name, err) } - nodesChanged = true } delete(nodeMap, node.Name) } @@ -326,15 +266,6 @@ func (nc *NodeController) syncCloudNodes() error { glog.Errorf("Delete node %s error: %v", nodeID, err) } nc.deletePods(nodeID) - nodesChanged = true - } - - // Make external services aware of nodes currently present in the cluster. - if nodesChanged || nc.reconcileServices { - nc.reconcileServices = nc.reconcileExternalServices(matches) - if nc.reconcileServices { - glog.Error("Reconcilation of external services failed and will be retried during the next sync.") - } } return nil diff --git a/pkg/cloudprovider/nodecontroller/nodecontroller_test.go b/pkg/cloudprovider/nodecontroller/nodecontroller_test.go index 8155ed6c2b1..8fbbf795bd3 100644 --- a/pkg/cloudprovider/nodecontroller/nodecontroller_test.go +++ b/pkg/cloudprovider/nodecontroller/nodecontroller_test.go @@ -32,7 +32,6 @@ import ( fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -558,7 +557,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) { matchRE: ".*", expectedRequestCount: 2, // List + Delete expectedDeleted: []string{"node1"}, - expectedActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}, {Action: "list-services"}}, + expectedActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}}, }, { // Delete node1, but pod0 is running on node0. @@ -572,7 +571,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) { matchRE: ".*", expectedRequestCount: 2, // List + Delete expectedDeleted: []string{"node1"}, - expectedActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "list-services"}}, + expectedActions: []testclient.FakeAction{{Action: "list-pods"}}, }, } @@ -598,71 +597,6 @@ func TestSyncCloudNodesEvictPods(t *testing.T) { } } -func TestSyncCloudNodesReconcilesExternalService(t *testing.T) { - table := []struct { - fakeNodeHandler *FakeNodeHandler - fakeCloud *fake_cloud.FakeCloud - matchRE string - expectedClientActions []testclient.FakeAction - expectedUpdateCalls []fake_cloud.FakeUpdateBalancerCall - }{ - { - // Set of nodes does not change: do nothing. - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{newNode("node0"), newNode("node1")}, - Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", types.UID(""), true), *newService("service1", types.UID(""), false)}})}, - fakeCloud: &fake_cloud.FakeCloud{ - Machines: []string{"node0", "node1"}, - }, - matchRE: ".*", - expectedClientActions: nil, - expectedUpdateCalls: nil, - }, - { - // Delete "node1", target pool for "service0" should shrink. - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{newNode("node0"), newNode("node1")}, - Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", types.UID("2c104a7c-e79e-11e4-8187-42010af0068a"), true), *newService("service1", types.UID(""), false)}})}, - fakeCloud: &fake_cloud.FakeCloud{ - Machines: []string{"node0"}, - }, - matchRE: ".*", - expectedClientActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "list-services"}}, - expectedUpdateCalls: []fake_cloud.FakeUpdateBalancerCall{ - {Name: "a2c104a7ce79e11e4818742010af0068", Hosts: []string{"node0"}}, - }, - }, - { - // Add "node1", target pool for "service0" should grow. - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{newNode("node0")}, - Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", types.UID("2c104a7c-e79e-11e4-8187-42010af0068a"), true), *newService("service1", types.UID(""), false)}})}, - fakeCloud: &fake_cloud.FakeCloud{ - Machines: []string{"node0", "node1"}, - }, - matchRE: ".*", - expectedClientActions: []testclient.FakeAction{{Action: "list-services"}}, - expectedUpdateCalls: []fake_cloud.FakeUpdateBalancerCall{ - {Name: "a2c104a7ce79e11e4818742010af0068", Hosts: []string{"node0", "node1"}}, - }, - }, - } - - for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, - 10, time.Minute, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "kubernetes") - if err := nodeController.syncCloudNodes(); err != nil { - t.Errorf("unexpected error: %v", err) - } - if !reflect.DeepEqual(item.expectedClientActions, item.fakeNodeHandler.Actions) { - t.Errorf("expected client actions mismatch, expected %+v, got %+v", item.expectedClientActions, item.fakeNodeHandler.Actions) - } - if !reflect.DeepEqual(item.expectedUpdateCalls, item.fakeCloud.UpdateCalls) { - t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, item.fakeCloud.UpdateCalls) - } - } -} - func TestPopulateNodeAddresses(t *testing.T) { table := []struct { nodes *api.NodeList @@ -1129,10 +1063,6 @@ func newPod(name, host string) *api.Pod { return &api.Pod{ObjectMeta: api.ObjectMeta{Name: name}, Spec: api.PodSpec{Host: host}} } -func newService(name string, uid types.UID, external bool) *api.Service { - return &api.Service{ObjectMeta: api.ObjectMeta{Name: name, Namespace: "namespace", UID: uid}, Spec: api.ServiceSpec{CreateExternalLoadBalancer: external}} -} - func sortedNodeNames(nodes []*api.Node) []string { nodeNames := []string{} for _, node := range nodes { diff --git a/pkg/cloudprovider/servicecontroller/servicecontroller.go b/pkg/cloudprovider/servicecontroller/servicecontroller.go index 055eb84b1f1..18d7b359423 100644 --- a/pkg/cloudprovider/servicecontroller/servicecontroller.go +++ b/pkg/cloudprovider/servicecontroller/servicecontroller.go @@ -79,7 +79,9 @@ func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName // Run starts a background goroutine that watches for changes to services that // have (or had) externalLoadBalancers=true and ensures that they have external // load balancers created and deleted appropriately. -func (s *ServiceController) Run() error { +// nodeSyncPeriod controls how often we check the cluster's nodes to determine +// if external load balancers need to be updated to point to a new set. +func (s *ServiceController) Run(nodeSyncPeriod time.Duration) error { if err := s.init(); err != nil { return err } @@ -101,6 +103,11 @@ func (s *ServiceController) Run() error { for i := 0; i < workerGoroutines; i++ { go s.watchServices(serviceQueue) } + + nodeLister := &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)} + nodeLW := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "nodes", api.NamespaceAll, fields.Everything()) + cache.NewReflector(nodeLW, &api.Node{}, nodeLister.Store, 0).Run() + go s.nodeSyncLoop(nodeLister, nodeSyncPeriod) return nil } @@ -367,6 +374,18 @@ func (s *serviceCache) ListKeys() []string { return keys } +// ListKeys implements the interface required by DeltaFIFO to list the keys we +// already know about. +func (s *serviceCache) allServices() []*cachedService { + s.mu.Lock() + defer s.mu.Unlock() + services := make([]*cachedService, 0, len(s.serviceMap)) + for _, v := range s.serviceMap { + services = append(services, v) + } + return services +} + func (s *serviceCache) get(serviceName string) (*cachedService, bool) { s.mu.Lock() defer s.mu.Unlock() @@ -445,13 +464,39 @@ func portsEqual(x, y *api.Service) bool { if err != nil { return false } - if len(xPorts) != len(yPorts) { + return intSlicesEqual(xPorts, yPorts) +} + +func intSlicesEqual(x, y []int) bool { + if len(x) != len(y) { return false } - sort.Ints(xPorts) - sort.Ints(yPorts) - for i := range xPorts { - if xPorts[i] != yPorts[i] { + if !sort.IntsAreSorted(x) { + sort.Ints(x) + } + if !sort.IntsAreSorted(y) { + sort.Ints(y) + } + for i := range x { + if x[i] != y[i] { + return false + } + } + return true +} + +func stringSlicesEqual(x, y []string) bool { + if len(x) != len(y) { + return false + } + if !sort.StringsAreSorted(x) { + sort.Strings(x) + } + if !sort.StringsAreSorted(y) { + sort.Strings(y) + } + for i := range x { + if x[i] != y[i] { return false } } @@ -465,3 +510,78 @@ func hostsFromNodeList(list *api.NodeList) []string { } return result } + +// nodeSyncLoop handles updating the hosts pointed to by all external load +// balancers whenever the set of nodes in the cluster changes. +func (s *ServiceController) nodeSyncLoop(nodeLister *cache.StoreToNodeLister, period time.Duration) { + var prevHosts []string + var servicesToUpdate []*cachedService + // TODO: Eliminate the unneeded now variable once we stop compiling in go1.3. + // It's needed at the moment because go1.3 requires ranges to be assigned to + // something to compile, and gofmt1.4 complains about using `_ = range`. + for now := range time.Tick(period) { + _ = now + nodes, err := nodeLister.List() + if err != nil { + glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err) + continue + } + newHosts := hostsFromNodeList(&nodes) + if stringSlicesEqual(newHosts, prevHosts) { + // The set of nodes in the cluster hasn't changed, but we can retry + // updating any services that we failed to update last time around. + servicesToUpdate = s.updateLoadBalancerHosts(servicesToUpdate, newHosts) + continue + } + glog.Infof("Detected change in list of current cluster nodes. New node set: %v", newHosts) + + // Try updating all services, and save the ones that fail to try again next + // round. + servicesToUpdate = s.cache.allServices() + numServices := len(servicesToUpdate) + servicesToUpdate = s.updateLoadBalancerHosts(servicesToUpdate, newHosts) + glog.Infof("Successfully updated %d out of %d external load balancers to direct traffic to the updated set of nodes", + numServices-len(servicesToUpdate), numServices) + + prevHosts = newHosts + } +} + +// updateLoadBalancerHosts updates all existing external load balancers so that +// they will match the list of hosts provided. +// Returns the list of services that couldn't be updated. +func (s *ServiceController) updateLoadBalancerHosts(services []*cachedService, hosts []string) (servicesToRetry []*cachedService) { + for _, service := range services { + func() { + service.mu.Lock() + defer service.mu.Unlock() + if err := s.lockedUpdateLoadBalancerHosts(service.service, hosts); err != nil { + glog.Errorf("External error while updating TCP load balancer: %v.", err) + servicesToRetry = append(servicesToRetry, service) + } + }() + } + return servicesToRetry +} + +// Updates the external load balancer of a service, assuming we hold the mutex +// associated with the service. +func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service, hosts []string) error { + if !service.Spec.CreateExternalLoadBalancer { + return nil + } + + name := cloudprovider.GetLoadBalancerName(service) + err := s.balancer.UpdateTCPLoadBalancer(name, s.zone.Region, hosts) + if err == nil { + return nil + } + + // It's only an actual error if the load balancer still exists. + if exists, err := s.balancer.TCPLoadBalancerExists(name, s.zone.Region); err != nil { + glog.Errorf("External error while checking if TCP load balancer %q exists: name, %v") + } else if !exists { + return nil + } + return err +} diff --git a/pkg/cloudprovider/servicecontroller/servicecontroller_test.go b/pkg/cloudprovider/servicecontroller/servicecontroller_test.go index d1a5a46a817..b250bfd4cb3 100644 --- a/pkg/cloudprovider/servicecontroller/servicecontroller_test.go +++ b/pkg/cloudprovider/servicecontroller/servicecontroller_test.go @@ -17,6 +17,7 @@ limitations under the License. package servicecontroller import ( + "reflect" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -27,6 +28,10 @@ import ( const region = "us-central" +func newService(name string, uid types.UID, external bool) *api.Service { + return &api.Service{ObjectMeta: api.ObjectMeta{Name: name, Namespace: "namespace", UID: uid}, Spec: api.ServiceSpec{CreateExternalLoadBalancer: external}} +} + func TestCreateExternalLoadBalancer(t *testing.T) { table := []struct { service *api.Service @@ -124,4 +129,82 @@ func TestCreateExternalLoadBalancer(t *testing.T) { } } +// TODO: Finish converting and update comments +func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { + hosts := []string{"node0", "node1", "node73"} + table := []struct { + services []*api.Service + expectedUpdateCalls []fake_cloud.FakeUpdateBalancerCall + }{ + { + // No services present: no calls should be made. + services: []*api.Service{}, + expectedUpdateCalls: nil, + }, + { + // Services do not have external load balancers: no calls should be made. + services: []*api.Service{ + newService("s0", "111", false), + newService("s1", "222", false), + }, + expectedUpdateCalls: nil, + }, + { + // Services does have an external load balancer: one call should be made. + services: []*api.Service{ + newService("s0", "333", true), + }, + expectedUpdateCalls: []fake_cloud.FakeUpdateBalancerCall{ + {Name: "a333", Region: region, Hosts: []string{"node0", "node1", "node73"}}, + }, + }, + { + // Three services have an external load balancer: three calls. + services: []*api.Service{ + newService("s0", "444", true), + newService("s1", "555", true), + newService("s2", "666", true), + }, + expectedUpdateCalls: []fake_cloud.FakeUpdateBalancerCall{ + {Name: "a444", Region: region, Hosts: []string{"node0", "node1", "node73"}}, + {Name: "a555", Region: region, Hosts: []string{"node0", "node1", "node73"}}, + {Name: "a666", Region: region, Hosts: []string{"node0", "node1", "node73"}}, + }, + }, + { + // Two services have an external load balancer and two don't: two calls. + services: []*api.Service{ + newService("s0", "777", false), + newService("s1", "888", true), + newService("s3", "999", true), + newService("s4", "123", false), + }, + expectedUpdateCalls: []fake_cloud.FakeUpdateBalancerCall{ + {Name: "a888", Region: region, Hosts: []string{"node0", "node1", "node73"}}, + {Name: "a999", Region: region, Hosts: []string{"node0", "node1", "node73"}}, + }, + }, + } + for _, item := range table { + cloud := &fake_cloud.FakeCloud{} + + cloud.Region = region + client := &testclient.Fake{} + controller := New(cloud, client, "test-cluster2") + controller.init() + cloud.Calls = nil // ignore any cloud calls made in init() + + var services []*cachedService + for _, service := range item.services { + services = append(services, &cachedService{service: service}) + } + if err := controller.updateLoadBalancerHosts(services, hosts); err != nil { + t.Errorf("unexpected error: %v", err) + } + if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) { + t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, cloud.UpdateCalls) + } + } +} + // TODO(a-robinson): Add tests for update/sync/delete.