mirror of
https://github.com/k8sgpt-ai/k8sgpt.git
synced 2025-09-25 15:00:34 +00:00
feat: custom analysis paralelism (#1203)
This commit is contained in:
@@ -164,23 +164,39 @@ func (a *Analysis) RunCustomAnalysis() {
|
||||
var customAnalyzers []custom.CustomAnalyzer
|
||||
if err := viper.UnmarshalKey("custom_analyzers", &customAnalyzers); err != nil {
|
||||
a.Errors = append(a.Errors, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
semaphore := make(chan struct{}, a.MaxConcurrency)
|
||||
var wg sync.WaitGroup
|
||||
var mutex sync.Mutex
|
||||
for _, cAnalyzer := range customAnalyzers {
|
||||
wg.Add(1)
|
||||
semaphore <- struct{}{}
|
||||
go func(analyzer custom.CustomAnalyzer, wg *sync.WaitGroup, semaphore chan struct{}) {
|
||||
defer wg.Done()
|
||||
canClient, err := custom.NewClient(cAnalyzer.Connection)
|
||||
if err != nil {
|
||||
mutex.Lock()
|
||||
a.Errors = append(a.Errors, fmt.Sprintf("Client creation error for %s analyzer", cAnalyzer.Name))
|
||||
mutex.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
canClient, err := custom.NewClient(cAnalyzer.Connection)
|
||||
if err != nil {
|
||||
a.Errors = append(a.Errors, fmt.Sprintf("Client creation error for %s analyzer", cAnalyzer.Name))
|
||||
continue
|
||||
}
|
||||
|
||||
result, err := canClient.Run()
|
||||
if err != nil {
|
||||
a.Errors = append(a.Errors, fmt.Sprintf("[%s] %s", cAnalyzer.Name, err))
|
||||
} else {
|
||||
a.Results = append(a.Results, result)
|
||||
}
|
||||
result, err := canClient.Run()
|
||||
if err != nil {
|
||||
mutex.Lock()
|
||||
a.Errors = append(a.Errors, fmt.Sprintf("[%s] %s", cAnalyzer.Name, err))
|
||||
mutex.Unlock()
|
||||
} else {
|
||||
mutex.Lock()
|
||||
a.Results = append(a.Results, result)
|
||||
mutex.Unlock()
|
||||
}
|
||||
<-semaphore
|
||||
}(cAnalyzer, &wg, semaphore)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (a *Analysis) RunAnalysis() {
|
||||
@@ -209,10 +225,10 @@ func (a *Analysis) RunAnalysis() {
|
||||
}
|
||||
|
||||
semaphore := make(chan struct{}, a.MaxConcurrency)
|
||||
var wg sync.WaitGroup
|
||||
var mutex sync.Mutex
|
||||
// if there are no filters selected and no active_filters then run coreAnalyzer
|
||||
if len(a.Filters) == 0 && len(activeFilters) == 0 {
|
||||
var wg sync.WaitGroup
|
||||
var mutex sync.Mutex
|
||||
for _, analyzer := range coreAnalyzerMap {
|
||||
wg.Add(1)
|
||||
semaphore <- struct{}{}
|
||||
@@ -234,11 +250,8 @@ func (a *Analysis) RunAnalysis() {
|
||||
wg.Wait()
|
||||
return
|
||||
}
|
||||
semaphore = make(chan struct{}, a.MaxConcurrency)
|
||||
// if the filters flag is specified
|
||||
if len(a.Filters) != 0 {
|
||||
var wg sync.WaitGroup
|
||||
var mutex sync.Mutex
|
||||
for _, filter := range a.Filters {
|
||||
if analyzer, ok := analyzerMap[filter]; ok {
|
||||
semaphore <- struct{}{}
|
||||
@@ -264,9 +277,6 @@ func (a *Analysis) RunAnalysis() {
|
||||
return
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var mutex sync.Mutex
|
||||
semaphore = make(chan struct{}, a.MaxConcurrency)
|
||||
// use active_filters
|
||||
for _, filter := range activeFilters {
|
||||
if analyzer, ok := analyzerMap[filter]; ok {
|
||||
|
Reference in New Issue
Block a user