From 6d8f2dddec9b2693fce1488c61fd60e3b2212ded Mon Sep 17 00:00:00 2001 From: Daniel Dao Date: Tue, 1 Aug 2017 23:14:55 +0100 Subject: [PATCH] fed/clustercontroller: fix race when updating data updates for ClusterController's maps were made without locking which can lead to race conditions which were detectede in https://github.com/kubernetes/kubernetes/issues/49958 This change adds a RWMutex to protect the data. We lock and unlock them whenever the data is accessed to make sure that we dont slow down too much. Signed-off-by: Daniel Dao --- .../cluster/clustercontroller.go | 45 ++++++++++++++---- .../cluster/clustercontroller_test.go | 46 +++++++++++++++++++ 2 files changed, 82 insertions(+), 9 deletions(-) diff --git a/federation/pkg/federation-controller/cluster/clustercontroller.go b/federation/pkg/federation-controller/cluster/clustercontroller.go index ecf5904db60..47d3a268dba 100644 --- a/federation/pkg/federation-controller/cluster/clustercontroller.go +++ b/federation/pkg/federation-controller/cluster/clustercontroller.go @@ -18,6 +18,7 @@ package cluster import ( "strings" + "sync" "time" "github.com/golang/glog" @@ -36,16 +37,16 @@ import ( ) type ClusterController struct { - knownClusterSet sets.String - // federationClient used to operate cluster federationClient federationclientset.Interface // clusterMonitorPeriod is the period for updating status of cluster clusterMonitorPeriod time.Duration + + mu sync.RWMutex + knownClusterSet sets.String // clusterClusterStatusMap is a mapping of clusterName and cluster status of last sampling clusterClusterStatusMap map[string]federationv1beta1.ClusterStatus - // clusterKubeClientMap is a mapping of clusterName and restclient clusterKubeClientMap map[string]ClusterClient @@ -94,22 +95,35 @@ func newClusterController(federationClient federationclientset.Interface, cluste // delFromClusterSet delete a cluster from clusterSet and // delete the corresponding restclient from the map clusterKubeClientMap func (cc *ClusterController) delFromClusterSet(obj interface{}) { + cc.mu.Lock() + defer cc.mu.Unlock() cluster := obj.(*federationv1beta1.Cluster) cc.delFromClusterSetByName(cluster.Name) } // delFromClusterSetByName delete a cluster from clusterSet by name and -// delete the corresponding restclient from the map clusterKubeClientMap +// delete the corresponding restclient from the map clusterKubeClientMap. +// Caller must make sure that they hold the mutex func (cc *ClusterController) delFromClusterSetByName(clusterName string) { glog.V(1).Infof("ClusterController observed a cluster deletion: %v", clusterName) cc.knownClusterSet.Delete(clusterName) delete(cc.clusterKubeClientMap, clusterName) } -// addToClusterSet insert the new cluster to clusterSet and create a corresponding -// restclient to map clusterKubeClientMap func (cc *ClusterController) addToClusterSet(obj interface{}) { + cc.mu.Lock() + defer cc.mu.Unlock() cluster := obj.(*federationv1beta1.Cluster) + cc.addToClusterSetWithoutLock(cluster) +} + +// addToClusterSetWithoutLock inserts the new cluster to clusterSet and create +// a corresponding restclient to map clusterKubeClientMap if the cluster is not +// known. Caller must make sure that they hold the mutex. +func (cc *ClusterController) addToClusterSetWithoutLock(cluster *federationv1beta1.Cluster) { + if cc.knownClusterSet.Has(cluster.Name) { + return + } glog.V(1).Infof("ClusterController observed a new cluster: %v", cluster.Name) cc.knownClusterSet.Insert(cluster.Name) // create the restclient of cluster @@ -134,7 +148,9 @@ func (cc *ClusterController) Run(stopChan <-chan struct{}) { } func (cc *ClusterController) GetClusterClient(cluster *federationv1beta1.Cluster) (*ClusterClient, error) { + cc.mu.RLock() clusterClient, found := cc.clusterKubeClientMap[cluster.Name] + cc.mu.RUnlock() client := &clusterClient if !found { glog.Infof("It's a new cluster, a cluster client will be created") @@ -163,11 +179,12 @@ func (cc *ClusterController) UpdateClusterStatus() error { if err != nil { return err } + + cc.mu.Lock() for _, cluster := range clusters.Items { - if !cc.knownClusterSet.Has(cluster.Name) { - cc.addToClusterSet(&cluster) - } + cc.addToClusterSetWithoutLock(&cluster) } + cc.mu.Unlock() // If there's a difference between lengths of known clusters and observed clusters if len(cc.knownClusterSet) != len(clusters.Items) { @@ -176,9 +193,12 @@ func (cc *ClusterController) UpdateClusterStatus() error { observedSet.Insert(cluster.Name) } deleted := cc.knownClusterSet.Difference(observedSet) + + cc.mu.Lock() for clusterName := range deleted { cc.delFromClusterSetByName(clusterName) } + cc.mu.Unlock() } for _, cluster := range clusters.Items { clusterStatusNew, err := cc.GetClusterStatus(&cluster) @@ -186,7 +206,9 @@ func (cc *ClusterController) UpdateClusterStatus() error { glog.Infof("Failed to Get the status of cluster: %v", cluster.Name) continue } + cc.mu.RLock() clusterStatusOld, found := cc.clusterClusterStatusMap[cluster.Name] + cc.mu.RUnlock() if !found { glog.Infof("There is no status stored for cluster: %v before", cluster.Name) @@ -210,7 +232,10 @@ func (cc *ClusterController) UpdateClusterStatus() error { } } } + + cc.mu.RLock() clusterClient, found := cc.clusterKubeClientMap[cluster.Name] + cc.mu.RUnlock() if !found { glog.Warningf("Failed to get client for cluster %s", cluster.Name) continue @@ -224,7 +249,9 @@ func (cc *ClusterController) UpdateClusterStatus() error { clusterStatusNew.Zones = zones clusterStatusNew.Region = region } + cc.mu.Lock() cc.clusterClusterStatusMap[cluster.Name] = *clusterStatusNew + cc.mu.Unlock() cluster.Status = *clusterStatusNew cluster, err := cc.federationClient.Federation().Clusters().UpdateStatus(&cluster) if err != nil { diff --git a/federation/pkg/federation-controller/cluster/clustercontroller_test.go b/federation/pkg/federation-controller/cluster/clustercontroller_test.go index c2db065a446..3b4c34c9bee 100644 --- a/federation/pkg/federation-controller/cluster/clustercontroller_test.go +++ b/federation/pkg/federation-controller/cluster/clustercontroller_test.go @@ -150,3 +150,49 @@ func TestUpdateClusterStatusOK(t *testing.T) { // Reset KubeconfigGetterForSecret controllerutil.KubeconfigGetterForSecret = originalGetter } + +// Test races between informer's updates and routine updates of cluster status +// Issue https://github.com/kubernetes/kubernetes/issues/49958 +func TestUpdateClusterRace(t *testing.T) { + clusterName := "foobarCluster" + // create dummy httpserver + testClusterServer := httptest.NewServer(createHttptestFakeHandlerForCluster(true)) + defer testClusterServer.Close() + federationCluster := newCluster(clusterName, testClusterServer.URL) + federationClusterList := newClusterList(federationCluster) + + testFederationServer := httptest.NewServer(createHttptestFakeHandlerForFederation(federationClusterList, true)) + defer testFederationServer.Close() + + restClientCfg, err := clientcmd.BuildConfigFromFlags(testFederationServer.URL, "") + if err != nil { + t.Errorf("Failed to build client config") + } + federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller")) + + // Override KubeconfigGetterForSecret to avoid having to setup service accounts and mount files with secret tokens. + originalGetter := controllerutil.KubeconfigGetterForSecret + controllerutil.KubeconfigGetterForSecret = func(s *api.Secret) clientcmd.KubeconfigGetter { + return func() (*clientcmdapi.Config, error) { + return &clientcmdapi.Config{}, nil + } + } + + manager := newClusterController(federationClientSet, 5) + + go func() { + for { + manager.UpdateClusterStatus() + } + }() + + // try to trigger the race in UpdateClusterStatus + for i := 0; i < 10; i++ { + manager.GetClusterClient(federationCluster) + manager.addToClusterSet(federationCluster) + manager.delFromClusterSet(federationCluster) + } + + // Reset KubeconfigGetterForSecret + controllerutil.KubeconfigGetterForSecret = originalGetter +}