mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Refactor updateClusterStatus to reduce locking
This commit is contained in:
parent
9f7ddb6409
commit
88e64e61ce
@ -107,6 +107,15 @@ func (self *ClusterClient) GetClusterHealthStatus() *federation_v1beta1.ClusterS
|
||||
clusterStatus.Conditions = append(clusterStatus.Conditions, newClusterReadyCondition)
|
||||
}
|
||||
}
|
||||
|
||||
zones, region, err := self.GetClusterZones()
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to get zones and region for cluster with client %v: %v", self, err)
|
||||
} else {
|
||||
clusterStatus.Zones = zones
|
||||
clusterStatus.Region = region
|
||||
}
|
||||
|
||||
return &clusterStatus
|
||||
}
|
||||
|
||||
|
@ -108,6 +108,7 @@ func (cc *ClusterController) delFromClusterSetByName(clusterName string) {
|
||||
glog.V(1).Infof("ClusterController observed a cluster deletion: %v", clusterName)
|
||||
cc.knownClusterSet.Delete(clusterName)
|
||||
delete(cc.clusterKubeClientMap, clusterName)
|
||||
delete(cc.clusterClusterStatusMap, clusterName)
|
||||
}
|
||||
|
||||
func (cc *ClusterController) addToClusterSet(obj interface{}) {
|
||||
@ -141,77 +142,37 @@ func (cc *ClusterController) Run(stopChan <-chan struct{}) {
|
||||
go cc.clusterController.Run(stopChan)
|
||||
// monitor cluster status periodically, in phase 1 we just get the health state from "/healthz"
|
||||
go wait.Until(func() {
|
||||
if err := cc.UpdateClusterStatus(); err != nil {
|
||||
if err := cc.updateClusterStatus(); err != nil {
|
||||
glog.Errorf("Error monitoring cluster status: %v", err)
|
||||
}
|
||||
}, cc.clusterMonitorPeriod, stopChan)
|
||||
}
|
||||
|
||||
func (cc *ClusterController) GetClusterClient(cluster *federationv1beta1.Cluster) (*ClusterClient, error) {
|
||||
cc.mu.RLock()
|
||||
clusterClient, found := cc.clusterKubeClientMap[cluster.Name]
|
||||
cc.mu.RUnlock()
|
||||
client := &clusterClient
|
||||
if !found {
|
||||
glog.Infof("It's a new cluster, a cluster client will be created")
|
||||
client, err := NewClusterClientSet(cluster)
|
||||
if err != nil || client == nil {
|
||||
glog.Errorf("Failed to create cluster client, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (cc *ClusterController) GetClusterStatus(cluster *federationv1beta1.Cluster) (*federationv1beta1.ClusterStatus, error) {
|
||||
// just get the status of cluster, by requesting the restapi "/healthz"
|
||||
clusterClient, err := cc.GetClusterClient(cluster)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
clusterStatus := clusterClient.GetClusterHealthStatus()
|
||||
return clusterStatus, nil
|
||||
}
|
||||
|
||||
// UpdateClusterStatus checks cluster status and get the metrics from cluster's restapi
|
||||
func (cc *ClusterController) UpdateClusterStatus() error {
|
||||
// updateClusterStatus checks cluster status and get the metrics from cluster's restapi
|
||||
func (cc *ClusterController) updateClusterStatus() error {
|
||||
clusters, err := cc.federationClient.Federation().Clusters().List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cc.mu.Lock()
|
||||
for _, cluster := range clusters.Items {
|
||||
cc.addToClusterSetWithoutLock(&cluster)
|
||||
}
|
||||
cc.mu.Unlock()
|
||||
|
||||
// If there's a difference between lengths of known clusters and observed clusters
|
||||
if len(cc.knownClusterSet) != len(clusters.Items) {
|
||||
observedSet := make(sets.String)
|
||||
for _, cluster := range clusters.Items {
|
||||
observedSet.Insert(cluster.Name)
|
||||
}
|
||||
deleted := cc.knownClusterSet.Difference(observedSet)
|
||||
|
||||
cc.mu.Lock()
|
||||
for clusterName := range deleted {
|
||||
cc.delFromClusterSetByName(clusterName)
|
||||
}
|
||||
cc.mu.Unlock()
|
||||
}
|
||||
for _, cluster := range clusters.Items {
|
||||
clusterStatusNew, err := cc.GetClusterStatus(&cluster)
|
||||
if err != nil {
|
||||
glog.Infof("Failed to Get the status of cluster: %v", cluster.Name)
|
||||
cc.mu.RLock()
|
||||
// skip updating status of the cluster which is not yet added to knownClusterSet.
|
||||
if !cc.knownClusterSet.Has(cluster.Name) {
|
||||
cc.mu.RUnlock()
|
||||
continue
|
||||
}
|
||||
cc.mu.RLock()
|
||||
clusterStatusOld, found := cc.clusterClusterStatusMap[cluster.Name]
|
||||
clusterClient, clientFound := cc.clusterKubeClientMap[cluster.Name]
|
||||
clusterStatusOld, statusFound := cc.clusterClusterStatusMap[cluster.Name]
|
||||
cc.mu.RUnlock()
|
||||
if !found {
|
||||
glog.Infof("There is no status stored for cluster: %v before", cluster.Name)
|
||||
|
||||
if !clientFound {
|
||||
glog.Warningf("Failed to get client for cluster %s", cluster.Name)
|
||||
continue
|
||||
}
|
||||
clusterStatusNew := clusterClient.GetClusterHealthStatus()
|
||||
if !statusFound {
|
||||
glog.Infof("There is no status stored for cluster: %v before", cluster.Name)
|
||||
} else {
|
||||
hasTransition := false
|
||||
if len(clusterStatusNew.Conditions) != len(clusterStatusOld.Conditions) {
|
||||
@ -233,22 +194,6 @@ func (cc *ClusterController) UpdateClusterStatus() error {
|
||||
}
|
||||
}
|
||||
|
||||
cc.mu.RLock()
|
||||
clusterClient, found := cc.clusterKubeClientMap[cluster.Name]
|
||||
cc.mu.RUnlock()
|
||||
if !found {
|
||||
glog.Warningf("Failed to get client for cluster %s", cluster.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
zones, region, err := clusterClient.GetClusterZones()
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to get zones and region for cluster %s: %v", cluster.Name, err)
|
||||
// Don't return err here, as we want the rest of the status update to proceed.
|
||||
} else {
|
||||
clusterStatusNew.Zones = zones
|
||||
clusterStatusNew.Region = region
|
||||
}
|
||||
cc.mu.Lock()
|
||||
cc.clusterClusterStatusMap[cluster.Name] = *clusterStatusNew
|
||||
cc.mu.Unlock()
|
||||
|
@ -134,7 +134,7 @@ func TestUpdateClusterStatusOK(t *testing.T) {
|
||||
}
|
||||
|
||||
manager := newClusterController(federationClientSet, 5)
|
||||
err = manager.UpdateClusterStatus()
|
||||
err = manager.updateClusterStatus()
|
||||
if err != nil {
|
||||
t.Errorf("Failed to Update Cluster Status: %v", err)
|
||||
}
|
||||
@ -188,7 +188,6 @@ func TestUpdateClusterRace(t *testing.T) {
|
||||
|
||||
// try to trigger the race in UpdateClusterStatus
|
||||
for i := 0; i < 10; i++ {
|
||||
manager.GetClusterClient(federationCluster)
|
||||
manager.addToClusterSet(federationCluster)
|
||||
manager.delFromClusterSet(federationCluster)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user