mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #49985 from dqminh/fix-race-federation-controller
Automatic merge from submit-queue federation-controller/clustercontroller: fix race when updating data Fix #49958 Updates for ClusterController's maps were made without locking which can lead to race conditions. This change adds a RWMutex to protect the data. We lock and unlock them whenever the data is accessed to make sure that we dont slow down too much
This commit is contained in:
commit
a826c378e5
@ -18,6 +18,7 @@ package cluster
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
@ -36,16 +37,16 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type ClusterController struct {
|
type ClusterController struct {
|
||||||
knownClusterSet sets.String
|
|
||||||
|
|
||||||
// federationClient used to operate cluster
|
// federationClient used to operate cluster
|
||||||
federationClient federationclientset.Interface
|
federationClient federationclientset.Interface
|
||||||
|
|
||||||
// clusterMonitorPeriod is the period for updating status of cluster
|
// clusterMonitorPeriod is the period for updating status of cluster
|
||||||
clusterMonitorPeriod time.Duration
|
clusterMonitorPeriod time.Duration
|
||||||
|
|
||||||
|
mu sync.RWMutex
|
||||||
|
knownClusterSet sets.String
|
||||||
// clusterClusterStatusMap is a mapping of clusterName and cluster status of last sampling
|
// clusterClusterStatusMap is a mapping of clusterName and cluster status of last sampling
|
||||||
clusterClusterStatusMap map[string]federationv1beta1.ClusterStatus
|
clusterClusterStatusMap map[string]federationv1beta1.ClusterStatus
|
||||||
|
|
||||||
// clusterKubeClientMap is a mapping of clusterName and restclient
|
// clusterKubeClientMap is a mapping of clusterName and restclient
|
||||||
clusterKubeClientMap map[string]ClusterClient
|
clusterKubeClientMap map[string]ClusterClient
|
||||||
|
|
||||||
@ -94,22 +95,35 @@ func newClusterController(federationClient federationclientset.Interface, cluste
|
|||||||
// delFromClusterSet delete a cluster from clusterSet and
|
// delFromClusterSet delete a cluster from clusterSet and
|
||||||
// delete the corresponding restclient from the map clusterKubeClientMap
|
// delete the corresponding restclient from the map clusterKubeClientMap
|
||||||
func (cc *ClusterController) delFromClusterSet(obj interface{}) {
|
func (cc *ClusterController) delFromClusterSet(obj interface{}) {
|
||||||
|
cc.mu.Lock()
|
||||||
|
defer cc.mu.Unlock()
|
||||||
cluster := obj.(*federationv1beta1.Cluster)
|
cluster := obj.(*federationv1beta1.Cluster)
|
||||||
cc.delFromClusterSetByName(cluster.Name)
|
cc.delFromClusterSetByName(cluster.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// delFromClusterSetByName delete a cluster from clusterSet by name and
|
// delFromClusterSetByName delete a cluster from clusterSet by name and
|
||||||
// delete the corresponding restclient from the map clusterKubeClientMap
|
// delete the corresponding restclient from the map clusterKubeClientMap.
|
||||||
|
// Caller must make sure that they hold the mutex
|
||||||
func (cc *ClusterController) delFromClusterSetByName(clusterName string) {
|
func (cc *ClusterController) delFromClusterSetByName(clusterName string) {
|
||||||
glog.V(1).Infof("ClusterController observed a cluster deletion: %v", clusterName)
|
glog.V(1).Infof("ClusterController observed a cluster deletion: %v", clusterName)
|
||||||
cc.knownClusterSet.Delete(clusterName)
|
cc.knownClusterSet.Delete(clusterName)
|
||||||
delete(cc.clusterKubeClientMap, clusterName)
|
delete(cc.clusterKubeClientMap, clusterName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// addToClusterSet insert the new cluster to clusterSet and create a corresponding
|
|
||||||
// restclient to map clusterKubeClientMap
|
|
||||||
func (cc *ClusterController) addToClusterSet(obj interface{}) {
|
func (cc *ClusterController) addToClusterSet(obj interface{}) {
|
||||||
|
cc.mu.Lock()
|
||||||
|
defer cc.mu.Unlock()
|
||||||
cluster := obj.(*federationv1beta1.Cluster)
|
cluster := obj.(*federationv1beta1.Cluster)
|
||||||
|
cc.addToClusterSetWithoutLock(cluster)
|
||||||
|
}
|
||||||
|
|
||||||
|
// addToClusterSetWithoutLock inserts the new cluster to clusterSet and create
|
||||||
|
// a corresponding restclient to map clusterKubeClientMap if the cluster is not
|
||||||
|
// known. Caller must make sure that they hold the mutex.
|
||||||
|
func (cc *ClusterController) addToClusterSetWithoutLock(cluster *federationv1beta1.Cluster) {
|
||||||
|
if cc.knownClusterSet.Has(cluster.Name) {
|
||||||
|
return
|
||||||
|
}
|
||||||
glog.V(1).Infof("ClusterController observed a new cluster: %v", cluster.Name)
|
glog.V(1).Infof("ClusterController observed a new cluster: %v", cluster.Name)
|
||||||
cc.knownClusterSet.Insert(cluster.Name)
|
cc.knownClusterSet.Insert(cluster.Name)
|
||||||
// create the restclient of cluster
|
// create the restclient of cluster
|
||||||
@ -134,7 +148,9 @@ func (cc *ClusterController) Run(stopChan <-chan struct{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cc *ClusterController) GetClusterClient(cluster *federationv1beta1.Cluster) (*ClusterClient, error) {
|
func (cc *ClusterController) GetClusterClient(cluster *federationv1beta1.Cluster) (*ClusterClient, error) {
|
||||||
|
cc.mu.RLock()
|
||||||
clusterClient, found := cc.clusterKubeClientMap[cluster.Name]
|
clusterClient, found := cc.clusterKubeClientMap[cluster.Name]
|
||||||
|
cc.mu.RUnlock()
|
||||||
client := &clusterClient
|
client := &clusterClient
|
||||||
if !found {
|
if !found {
|
||||||
glog.Infof("It's a new cluster, a cluster client will be created")
|
glog.Infof("It's a new cluster, a cluster client will be created")
|
||||||
@ -163,11 +179,12 @@ func (cc *ClusterController) UpdateClusterStatus() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cc.mu.Lock()
|
||||||
for _, cluster := range clusters.Items {
|
for _, cluster := range clusters.Items {
|
||||||
if !cc.knownClusterSet.Has(cluster.Name) {
|
cc.addToClusterSetWithoutLock(&cluster)
|
||||||
cc.addToClusterSet(&cluster)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
cc.mu.Unlock()
|
||||||
|
|
||||||
// If there's a difference between lengths of known clusters and observed clusters
|
// If there's a difference between lengths of known clusters and observed clusters
|
||||||
if len(cc.knownClusterSet) != len(clusters.Items) {
|
if len(cc.knownClusterSet) != len(clusters.Items) {
|
||||||
@ -176,9 +193,12 @@ func (cc *ClusterController) UpdateClusterStatus() error {
|
|||||||
observedSet.Insert(cluster.Name)
|
observedSet.Insert(cluster.Name)
|
||||||
}
|
}
|
||||||
deleted := cc.knownClusterSet.Difference(observedSet)
|
deleted := cc.knownClusterSet.Difference(observedSet)
|
||||||
|
|
||||||
|
cc.mu.Lock()
|
||||||
for clusterName := range deleted {
|
for clusterName := range deleted {
|
||||||
cc.delFromClusterSetByName(clusterName)
|
cc.delFromClusterSetByName(clusterName)
|
||||||
}
|
}
|
||||||
|
cc.mu.Unlock()
|
||||||
}
|
}
|
||||||
for _, cluster := range clusters.Items {
|
for _, cluster := range clusters.Items {
|
||||||
clusterStatusNew, err := cc.GetClusterStatus(&cluster)
|
clusterStatusNew, err := cc.GetClusterStatus(&cluster)
|
||||||
@ -186,7 +206,9 @@ func (cc *ClusterController) UpdateClusterStatus() error {
|
|||||||
glog.Infof("Failed to Get the status of cluster: %v", cluster.Name)
|
glog.Infof("Failed to Get the status of cluster: %v", cluster.Name)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
cc.mu.RLock()
|
||||||
clusterStatusOld, found := cc.clusterClusterStatusMap[cluster.Name]
|
clusterStatusOld, found := cc.clusterClusterStatusMap[cluster.Name]
|
||||||
|
cc.mu.RUnlock()
|
||||||
if !found {
|
if !found {
|
||||||
glog.Infof("There is no status stored for cluster: %v before", cluster.Name)
|
glog.Infof("There is no status stored for cluster: %v before", cluster.Name)
|
||||||
|
|
||||||
@ -210,7 +232,10 @@ func (cc *ClusterController) UpdateClusterStatus() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cc.mu.RLock()
|
||||||
clusterClient, found := cc.clusterKubeClientMap[cluster.Name]
|
clusterClient, found := cc.clusterKubeClientMap[cluster.Name]
|
||||||
|
cc.mu.RUnlock()
|
||||||
if !found {
|
if !found {
|
||||||
glog.Warningf("Failed to get client for cluster %s", cluster.Name)
|
glog.Warningf("Failed to get client for cluster %s", cluster.Name)
|
||||||
continue
|
continue
|
||||||
@ -224,7 +249,9 @@ func (cc *ClusterController) UpdateClusterStatus() error {
|
|||||||
clusterStatusNew.Zones = zones
|
clusterStatusNew.Zones = zones
|
||||||
clusterStatusNew.Region = region
|
clusterStatusNew.Region = region
|
||||||
}
|
}
|
||||||
|
cc.mu.Lock()
|
||||||
cc.clusterClusterStatusMap[cluster.Name] = *clusterStatusNew
|
cc.clusterClusterStatusMap[cluster.Name] = *clusterStatusNew
|
||||||
|
cc.mu.Unlock()
|
||||||
cluster.Status = *clusterStatusNew
|
cluster.Status = *clusterStatusNew
|
||||||
cluster, err := cc.federationClient.Federation().Clusters().UpdateStatus(&cluster)
|
cluster, err := cc.federationClient.Federation().Clusters().UpdateStatus(&cluster)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -150,3 +150,49 @@ func TestUpdateClusterStatusOK(t *testing.T) {
|
|||||||
// Reset KubeconfigGetterForSecret
|
// Reset KubeconfigGetterForSecret
|
||||||
controllerutil.KubeconfigGetterForSecret = originalGetter
|
controllerutil.KubeconfigGetterForSecret = originalGetter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test races between informer's updates and routine updates of cluster status
|
||||||
|
// Issue https://github.com/kubernetes/kubernetes/issues/49958
|
||||||
|
func TestUpdateClusterRace(t *testing.T) {
|
||||||
|
clusterName := "foobarCluster"
|
||||||
|
// create dummy httpserver
|
||||||
|
testClusterServer := httptest.NewServer(createHttptestFakeHandlerForCluster(true))
|
||||||
|
defer testClusterServer.Close()
|
||||||
|
federationCluster := newCluster(clusterName, testClusterServer.URL)
|
||||||
|
federationClusterList := newClusterList(federationCluster)
|
||||||
|
|
||||||
|
testFederationServer := httptest.NewServer(createHttptestFakeHandlerForFederation(federationClusterList, true))
|
||||||
|
defer testFederationServer.Close()
|
||||||
|
|
||||||
|
restClientCfg, err := clientcmd.BuildConfigFromFlags(testFederationServer.URL, "")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Failed to build client config")
|
||||||
|
}
|
||||||
|
federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
|
||||||
|
|
||||||
|
// Override KubeconfigGetterForSecret to avoid having to setup service accounts and mount files with secret tokens.
|
||||||
|
originalGetter := controllerutil.KubeconfigGetterForSecret
|
||||||
|
controllerutil.KubeconfigGetterForSecret = func(s *api.Secret) clientcmd.KubeconfigGetter {
|
||||||
|
return func() (*clientcmdapi.Config, error) {
|
||||||
|
return &clientcmdapi.Config{}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
manager := newClusterController(federationClientSet, 5)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
manager.UpdateClusterStatus()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// try to trigger the race in UpdateClusterStatus
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
manager.GetClusterClient(federationCluster)
|
||||||
|
manager.addToClusterSet(federationCluster)
|
||||||
|
manager.delFromClusterSet(federationCluster)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset KubeconfigGetterForSecret
|
||||||
|
controllerutil.KubeconfigGetterForSecret = originalGetter
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user