From eb253a694e965ae8638d4b45433f812607019e91 Mon Sep 17 00:00:00 2001 From: Jerzy Szczepkowski Date: Wed, 1 Apr 2015 14:52:28 +0200 Subject: [PATCH 1/2] Updating target pools on cloud nodes change. Implemented updating target pools for external services on chage of cloud nodes. Related to #5241. --- cmd/integration/integration.go | 2 +- .../app/controllermanager.go | 7 +- cmd/kubernetes/kubernetes.go | 2 +- pkg/cloudprovider/cloud.go | 4 + .../controller/nodecontroller.go | 65 +++++++++++++- .../controller/nodecontroller_test.go | 89 ++++++++++++++++--- pkg/cloudprovider/fake/fake.go | 9 +- pkg/cloudprovider/gce/gce.go | 46 ++++++++-- pkg/registry/service/rest.go | 9 +- 9 files changed, 204 insertions(+), 29 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index fdff8ee62f0..67eaff339fa 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -225,7 +225,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), }} - nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second) + nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, "") nodeController.Run(5*time.Second, true) cadvisorInterface := new(cadvisor.Fake) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 8d6f33d7d9a..e7894708264 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -70,6 +70,7 @@ type CMServer struct { NodeMemory resource.Quantity KubeletConfig client.KubeletConfig + ClusterName string EnableProfiling bool } @@ -91,6 +92,7 @@ func NewCMServer() *CMServer { EnableHttps: true, HTTPTimeout: time.Duration(5) * time.Second, }, + ClusterName: "kubernetes", } return &s } @@ -132,7 +134,8 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node") fs.Var(resource.NewQuantityFlagValue(&s.NodeMemory), "node_memory", "The amount of memory (in bytes) provisioned on each node") client.BindKubeletClientConfigFlags(fs, &s.KubeletConfig) - fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") + fs.StringVar(&s.ClusterName, "cluster_name", s.ClusterName, "The instance prefix for the cluster") + fs.BoolVar(&s.EnableProfiling, "profiling", false, "Enable profiling via web interface host:port/debug/pprof/") } func (s *CMServer) verifyMinionFlags() { @@ -198,7 +201,7 @@ func (s *CMServer) Run(_ []string) error { nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), - s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod) + s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, s.ClusterName ) nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList) resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index f32e253db93..a610c840e4f 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -131,7 +131,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort} nodeController := nodeControllerPkg.NewNodeController( - nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second) + nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second, "") nodeController.Run(10*time.Second, true) endpoints := service.NewEndpointController(cl) diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index c50d0627ebb..3f76e80147f 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -42,6 +42,10 @@ type Clusters interface { Master(clusterName string) (string, error) } +func GetLoadBalancerName(clusterName, serviceNamespace, serviceName string) string { + return clusterName + "-" + serviceNamespace + "-" + serviceName +} + // TCPLoadBalancer is an abstract, pluggable interface for TCP load balancers. type TCPLoadBalancer interface { // TCPLoadBalancerExists returns whether the specified load balancer exists. diff --git a/pkg/cloudprovider/controller/nodecontroller.go b/pkg/cloudprovider/controller/nodecontroller.go index 953d6e76b10..9d0989ffca4 100644 --- a/pkg/cloudprovider/controller/nodecontroller.go +++ b/pkg/cloudprovider/controller/nodecontroller.go @@ -88,6 +88,7 @@ type NodeController struct { // check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod. // TODO: Change node status monitor to watch based. nodeMonitorPeriod time.Duration + clusterName string // Method for easy mocking in unittest. lookupIP func(host string) ([]net.IP, error) now func() util.Time @@ -106,7 +107,8 @@ func NewNodeController( deletingPodsRateLimiter util.RateLimiter, nodeMonitorGracePeriod time.Duration, nodeStartupGracePeriod time.Duration, - nodeMonitorPeriod time.Duration) *NodeController { + nodeMonitorPeriod time.Duration, + clusterName string) *NodeController { eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"}) if kubeClient != nil { @@ -132,6 +134,7 @@ func NewNodeController( nodeStartupGracePeriod: nodeStartupGracePeriod, lookupIP: net.LookupIP, now: util.Now, + clusterName: clusterName, } } @@ -219,6 +222,58 @@ func (nc *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, ret } } +// reconcileExternalServices updates balancers for external services, so that they will match the nodes given. +func (nc *NodeController) reconcileExternalServices(nodes *api.NodeList) { + balancer, ok := nc.cloud.TCPLoadBalancer() + if !ok { + glog.Error("The cloud provider does not support external TCP load balancers.") + return + } + + zones, ok := nc.cloud.Zones() + if !ok { + glog.Error("The cloud provider does not support zone enumeration.") + return + } + zone, err := zones.GetZone() + if err != nil { + glog.Errorf("Error while getting zone: %v", err) + return + } + + hosts := []string{} + for _, node := range nodes.Items { + hosts = append(hosts, node.Name) + } + + services, err := nc.kubeClient.Services(api.NamespaceAll).List(labels.Everything()) + if err != nil { + glog.Errorf("Error while listing services: %v", err) + return + } + for _, service := range services.Items { + if service.Spec.CreateExternalLoadBalancer { + nonTCPPort := false + for i := range service.Spec.Ports { + if service.Spec.Ports[i].Protocol != api.ProtocolTCP { + nonTCPPort = true + break + } + } + if nonTCPPort { + // TODO: Support UDP here. + glog.Errorf("External load balancers for non TCP services are not currently supported: %v.", service) + continue + } + name := cloudprovider.GetLoadBalancerName(nc.clusterName, service.Namespace, service.Name) + err := balancer.UpdateTCPLoadBalancer(name, zone.Region, hosts) + if err != nil { + glog.Errorf("External error while updating TCP load balancer: %v.", err) + } + } + } +} + // SyncCloudNodes synchronizes the list of instances from cloudprovider to master server. func (nc *NodeController) SyncCloudNodes() error { matches, err := nc.GetCloudNodesWithSpec() @@ -237,6 +292,7 @@ func (nc *NodeController) SyncCloudNodes() error { // Create nodes which have been created in cloud, but not in kubernetes cluster // Skip nodes if we hit an error while trying to get their addresses. + nodesChanged := false for _, node := range matches.Items { if _, ok := nodeMap[node.Name]; !ok { glog.V(3).Infof("Querying addresses for new node: %s", node.Name) @@ -254,6 +310,7 @@ func (nc *NodeController) SyncCloudNodes() error { if err != nil { glog.Errorf("Create node %s error: %v", node.Name, err) } + nodesChanged = true } delete(nodeMap, node.Name) } @@ -266,6 +323,12 @@ func (nc *NodeController) SyncCloudNodes() error { glog.Errorf("Delete node %s error: %v", nodeID, err) } nc.deletePods(nodeID) + nodesChanged = true + } + + // Make external services aware of nodes currently present in the cluster. + if nodesChanged { + nc.reconcileExternalServices(matches) } return nil diff --git a/pkg/cloudprovider/controller/nodecontroller_test.go b/pkg/cloudprovider/controller/nodecontroller_test.go index 48479cdd9fb..af5ff3a339b 100644 --- a/pkg/cloudprovider/controller/nodecontroller_test.go +++ b/pkg/cloudprovider/controller/nodecontroller_test.go @@ -262,7 +262,7 @@ func TestRegisterNodes(t *testing.T) { nodes.Items = append(nodes.Items, *newNode(machine)) } nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod) + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "") err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond) if !item.expectedFail && err != nil { t.Errorf("unexpected error: %v", err) @@ -348,7 +348,7 @@ func TestCreateGetStaticNodesWithSpec(t *testing.T) { } for _, item := range table { nodeController := NewNodeController(nil, "", item.machines, &resources, nil, nil, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod) + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "") nodes, err := nodeController.GetStaticNodesWithSpec() if err != nil { t.Errorf("unexpected error: %v", err) @@ -410,7 +410,7 @@ func TestCreateGetCloudNodesWithSpec(t *testing.T) { for _, item := range table { nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod) + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "") nodes, err := nodeController.GetCloudNodesWithSpec() if err != nil { t.Errorf("unexpected error: %v", err) @@ -520,7 +520,7 @@ func TestSyncCloudNodes(t *testing.T) { item.fakeNodeHandler.Fake = testclient.NewSimpleFake() } nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod) + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "") if err := nodeController.SyncCloudNodes(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -581,7 +581,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) { matchRE: ".*", expectedRequestCount: 2, // List + Delete expectedDeleted: []string{"node1"}, - expectedActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}}, + expectedActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}, {Action: "list-services"}}, }, { // Delete node1, but pod0 is running on node0. @@ -595,7 +595,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) { matchRE: ".*", expectedRequestCount: 2, // List + Delete expectedDeleted: []string{"node1"}, - expectedActions: []testclient.FakeAction{{Action: "list-pods"}}, + expectedActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "list-services"}}, }, } @@ -604,7 +604,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) { item.fakeNodeHandler.Fake = testclient.NewSimpleFake() } nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod) + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "") if err := nodeController.SyncCloudNodes(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -621,6 +621,71 @@ func TestSyncCloudNodesEvictPods(t *testing.T) { } } +func TestSyncCloudNodesReconcilesExternalService(t *testing.T) { + table := []struct { + fakeNodeHandler *FakeNodeHandler + fakeCloud *fake_cloud.FakeCloud + matchRE string + expectedClientActions []testclient.FakeAction + expectedUpdateCalls []fake_cloud.FakeUpdateBalancerCall + }{ + { + // Set of nodes does not change: do nothing. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0"), newNode("node1")}, + Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", true), *newService("service1", false)}})}, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0", "node1"}, + }, + matchRE: ".*", + expectedClientActions: nil, + expectedUpdateCalls: nil, + }, + { + // Delete "node1", target pool for "service0" should shrink. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0"), newNode("node1")}, + Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", true), *newService("service1", false)}})}, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0"}, + }, + matchRE: ".*", + expectedClientActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "list-services"}}, + expectedUpdateCalls: []fake_cloud.FakeUpdateBalancerCall{ + {Name: "kubernetes-namespace-service0", Hosts: []string{"node0"}}, + }, + }, + { + // Add "node1", target pool for "service0" should grow. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0")}, + Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", true), *newService("service1", false)}})}, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0", "node1"}, + }, + matchRE: ".*", + expectedClientActions: []testclient.FakeAction{{Action: "list-services"}}, + expectedUpdateCalls: []fake_cloud.FakeUpdateBalancerCall{ + {Name: "kubernetes-namespace-service0", Hosts: []string{"node0", "node1"}}, + }, + }, + } + + for _, item := range table { + nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, + 10, time.Minute, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "kubernetes") + if err := nodeController.SyncCloudNodes(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if !reflect.DeepEqual(item.expectedClientActions, item.fakeNodeHandler.Actions) { + t.Errorf("expected client actions mismatch, expected %+v, got %+v", item.expectedClientActions, item.fakeNodeHandler.Actions) + } + if !reflect.DeepEqual(item.expectedUpdateCalls, item.fakeCloud.UpdateCalls) { + t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, item.fakeCloud.UpdateCalls) + } + } +} + func TestPopulateNodeAddresses(t *testing.T) { table := []struct { nodes *api.NodeList @@ -644,7 +709,7 @@ func TestPopulateNodeAddresses(t *testing.T) { for _, item := range table { nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod) + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "") result, err := nodeController.PopulateAddresses(item.nodes) // In case of IP querying error, we should continue. if err != nil { @@ -844,7 +909,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { for _, item := range table { nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, - testNodeStartupGracePeriod, testNodeMonitorPeriod) + testNodeStartupGracePeriod, testNodeMonitorPeriod, "") nodeController.now = func() util.Time { return fakeNow } if err := nodeController.MonitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) @@ -1046,7 +1111,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { for _, item := range table { nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute, util.NewFakeRateLimiter(), - testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod) + testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "") nodeController.now = func() util.Time { return fakeNow } if err := nodeController.MonitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) @@ -1087,6 +1152,10 @@ func newPod(name, host string) *api.Pod { return &api.Pod{ObjectMeta: api.ObjectMeta{Name: name}, Spec: api.PodSpec{Host: host}} } +func newService(name string, external bool) *api.Service { + return &api.Service{ObjectMeta: api.ObjectMeta{Name: name, Namespace: "namespace"}, Spec: api.ServiceSpec{CreateExternalLoadBalancer: external}} +} + func sortedNodeNames(nodes []*api.Node) []string { nodeNames := []string{} for _, node := range nodes { diff --git a/pkg/cloudprovider/fake/fake.go b/pkg/cloudprovider/fake/fake.go index 624c2d61d86..664fd175be9 100644 --- a/pkg/cloudprovider/fake/fake.go +++ b/pkg/cloudprovider/fake/fake.go @@ -33,6 +33,12 @@ type FakeBalancer struct { Hosts []string } +type FakeUpdateBalancerCall struct { + Name string + Region string + Hosts []string +} + // FakeCloud is a test-double implementation of Interface, TCPLoadBalancer and Instances. It is useful for testing. type FakeCloud struct { Exists bool @@ -46,7 +52,7 @@ type FakeCloud struct { MasterName string ExternalIP net.IP Balancers []FakeBalancer - + UpdateCalls []FakeUpdateBalancerCall cloudprovider.Zone } @@ -105,6 +111,7 @@ func (f *FakeCloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP // It adds an entry "update" into the internal method call record. func (f *FakeCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error { f.addCall("update") + f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{name, region, hosts}) return f.Err } diff --git a/pkg/cloudprovider/gce/gce.go b/pkg/cloudprovider/gce/gce.go index 745c8df8998..188aa34ba52 100644 --- a/pkg/cloudprovider/gce/gce.go +++ b/pkg/cloudprovider/gce/gce.go @@ -32,6 +32,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "code.google.com/p/gcfg" compute "code.google.com/p/google-api-go-client/compute/v1" @@ -95,7 +96,12 @@ func getProjectAndZone() (string, string, error) { if len(parts) != 4 { return "", "", fmt.Errorf("unexpected response: %s", result) } - return parts[1], parts[3], nil + zone := parts[3] + projectID, err := metadata.ProjectID() + if err != nil { + return "", "", err + } + return projectID, zone, nil } func getInstanceID() (string, error) { @@ -292,15 +298,41 @@ func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.I // UpdateTCPLoadBalancer is an implementation of TCPLoadBalancer.UpdateTCPLoadBalancer. func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error { - var refs []*compute.InstanceReference - for _, host := range hosts { - refs = append(refs, &compute.InstanceReference{host}) + pool, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do() + if err != nil { + return err } - req := &compute.TargetPoolsAddInstanceRequest{ - Instances: refs, + existing := util.NewStringSet(pool.Instances...) + + var toAdd []*compute.InstanceReference + var toRemove []*compute.InstanceReference + for _, host := range hosts { + link := makeHostLink(gce.projectID, gce.zone, host) + if !existing.Has(link) { + toAdd = append(toAdd, &compute.InstanceReference{link}) + } + existing.Delete(link) + } + for link := range existing { + toRemove = append(toRemove, &compute.InstanceReference{link}) } - op, err := gce.service.TargetPools.AddInstance(gce.projectID, region, name, req).Do() + add := &compute.TargetPoolsAddInstanceRequest{ + Instances: toAdd, + } + op, err := gce.service.TargetPools.AddInstance(gce.projectID, region, name, add).Do() + if err != nil { + return err + } + err = gce.waitForRegionOp(op, region) + if err != nil { + return err + } + + rm := &compute.TargetPoolsRemoveInstanceRequest{ + Instances: toRemove, + } + op, err = gce.service.TargetPools.RemoveInstance(gce.projectID, region, name, rm).Do() if err != nil { return err } diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 8f76deb7c43..fce902023de 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -273,10 +273,6 @@ func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.Rou return nil, nil, fmt.Errorf("no endpoints available for %q", id) } -func (rs *REST) getLoadbalancerName(ctx api.Context, service *api.Service) string { - return rs.clusterName + "-" + api.NamespaceValue(ctx) + "-" + service.Name -} - func (rs *REST) createExternalLoadBalancer(ctx api.Context, service *api.Service) error { if rs.cloud == nil { return fmt.Errorf("requested an external service, but no cloud provider supplied.") @@ -303,7 +299,7 @@ func (rs *REST) createExternalLoadBalancer(ctx api.Context, service *api.Service if err != nil { return err } - name := rs.getLoadbalancerName(ctx, service) + name := cloudprovider.GetLoadBalancerName(rs.clusterName, api.NamespaceValue(ctx), service.Name) var affinityType api.AffinityType = service.Spec.SessionAffinity if len(service.Spec.PublicIPs) > 0 { for _, publicIP := range service.Spec.PublicIPs { @@ -376,7 +372,8 @@ func (rs *REST) deleteExternalLoadBalancer(ctx api.Context, service *api.Service if err != nil { return err } - if err := balancer.DeleteTCPLoadBalancer(rs.getLoadbalancerName(ctx, service), zone.Region); err != nil { + name := cloudprovider.GetLoadBalancerName(rs.clusterName, api.NamespaceValue(ctx), service.Name) + if err := balancer.DeleteTCPLoadBalancer(name, zone.Region); err != nil { return err } return nil From 1c042208c771f14fadb447d669aa510acf3c7a5b Mon Sep 17 00:00:00 2001 From: Jerzy Szczepkowski Date: Wed, 8 Apr 2015 13:05:33 +0200 Subject: [PATCH 2/2] Added retrying update of balancers in case some of updates failed. --- .../app/controllermanager.go | 2 +- .../controller/nodecontroller.go | 25 +++++++++++++------ .../controller/nodecontroller_test.go | 6 ++--- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index e7894708264..aa063b6a9f1 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -201,7 +201,7 @@ func (s *CMServer) Run(_ []string) error { nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), - s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, s.ClusterName ) + s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, s.ClusterName) nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList) resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient) diff --git a/pkg/cloudprovider/controller/nodecontroller.go b/pkg/cloudprovider/controller/nodecontroller.go index 9d0989ffca4..72f3657ccd1 100644 --- a/pkg/cloudprovider/controller/nodecontroller.go +++ b/pkg/cloudprovider/controller/nodecontroller.go @@ -88,7 +88,9 @@ type NodeController struct { // check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod. // TODO: Change node status monitor to watch based. nodeMonitorPeriod time.Duration - clusterName string + clusterName string + // Should external services be reconciled during syncing cloud nodes, even though the nodes were not changed. + reconcileServices bool // Method for easy mocking in unittest. lookupIP func(host string) ([]net.IP, error) now func() util.Time @@ -223,22 +225,23 @@ func (nc *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, ret } // reconcileExternalServices updates balancers for external services, so that they will match the nodes given. -func (nc *NodeController) reconcileExternalServices(nodes *api.NodeList) { +// Returns true if something went wrong and we should call reconcile again. +func (nc *NodeController) reconcileExternalServices(nodes *api.NodeList) (shouldRetry bool) { balancer, ok := nc.cloud.TCPLoadBalancer() if !ok { glog.Error("The cloud provider does not support external TCP load balancers.") - return + return false } zones, ok := nc.cloud.Zones() if !ok { glog.Error("The cloud provider does not support zone enumeration.") - return + return false } zone, err := zones.GetZone() if err != nil { glog.Errorf("Error while getting zone: %v", err) - return + return false } hosts := []string{} @@ -249,8 +252,9 @@ func (nc *NodeController) reconcileExternalServices(nodes *api.NodeList) { services, err := nc.kubeClient.Services(api.NamespaceAll).List(labels.Everything()) if err != nil { glog.Errorf("Error while listing services: %v", err) - return + return true } + shouldRetry = false for _, service := range services.Items { if service.Spec.CreateExternalLoadBalancer { nonTCPPort := false @@ -269,9 +273,11 @@ func (nc *NodeController) reconcileExternalServices(nodes *api.NodeList) { err := balancer.UpdateTCPLoadBalancer(name, zone.Region, hosts) if err != nil { glog.Errorf("External error while updating TCP load balancer: %v.", err) + shouldRetry = true } } } + return shouldRetry } // SyncCloudNodes synchronizes the list of instances from cloudprovider to master server. @@ -327,8 +333,11 @@ func (nc *NodeController) SyncCloudNodes() error { } // Make external services aware of nodes currently present in the cluster. - if nodesChanged { - nc.reconcileExternalServices(matches) + if nodesChanged || nc.reconcileServices { + nc.reconcileServices = nc.reconcileExternalServices(matches) + if nc.reconcileServices { + glog.Error("Reconcilation of external services failed and will be retried during the next sync.") + } } return nil diff --git a/pkg/cloudprovider/controller/nodecontroller_test.go b/pkg/cloudprovider/controller/nodecontroller_test.go index af5ff3a339b..b0d17c2dcdf 100644 --- a/pkg/cloudprovider/controller/nodecontroller_test.go +++ b/pkg/cloudprovider/controller/nodecontroller_test.go @@ -633,7 +633,7 @@ func TestSyncCloudNodesReconcilesExternalService(t *testing.T) { // Set of nodes does not change: do nothing. fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{newNode("node0"), newNode("node1")}, - Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", true), *newService("service1", false)}})}, + Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", true), *newService("service1", false)}})}, fakeCloud: &fake_cloud.FakeCloud{ Machines: []string{"node0", "node1"}, }, @@ -645,7 +645,7 @@ func TestSyncCloudNodesReconcilesExternalService(t *testing.T) { // Delete "node1", target pool for "service0" should shrink. fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{newNode("node0"), newNode("node1")}, - Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", true), *newService("service1", false)}})}, + Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", true), *newService("service1", false)}})}, fakeCloud: &fake_cloud.FakeCloud{ Machines: []string{"node0"}, }, @@ -659,7 +659,7 @@ func TestSyncCloudNodesReconcilesExternalService(t *testing.T) { // Add "node1", target pool for "service0" should grow. fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{newNode("node0")}, - Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", true), *newService("service1", false)}})}, + Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", true), *newService("service1", false)}})}, fakeCloud: &fake_cloud.FakeCloud{ Machines: []string{"node0", "node1"}, },