feat: async calls (#311)

* feat: async calls

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>

* feat: added concurrency settings

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>

* feat: added in ability to set max concurrency

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>

---------

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>
Co-authored-by: Matthis <99146727+matthisholleville@users.noreply.github.com>
This commit is contained in:
Alex Jones 2023-04-24 16:04:37 +02:00 committed by GitHub
parent 2391603075
commit c3cc413e7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 107 additions and 52 deletions

View File

@ -31,6 +31,7 @@ var (
nocache bool nocache bool
namespace string namespace string
anonymize bool anonymize bool
maxConcurrency int
) )
// AnalyzeCmd represents the problems command // AnalyzeCmd represents the problems command
@ -43,7 +44,8 @@ var AnalyzeCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
// AnalysisResult configuration // AnalysisResult configuration
config, err := analysis.NewAnalysis(backend, language, filters, namespace, nocache, explain) config, err := analysis.NewAnalysis(backend,
language, filters, namespace, nocache, explain, maxConcurrency)
if err != nil { if err != nil {
color.Red("Error: %v", err) color.Red("Error: %v", err)
os.Exit(1) os.Exit(1)
@ -92,4 +94,6 @@ func init() {
AnalyzeCmd.Flags().StringVarP(&output, "output", "o", "text", "Output format (text, json)") AnalyzeCmd.Flags().StringVarP(&output, "output", "o", "text", "Output format (text, json)")
// add language options for output // add language options for output
AnalyzeCmd.Flags().StringVarP(&language, "language", "l", "english", "Languages to use for AI (e.g. 'English', 'Spanish', 'French', 'German', 'Italian', 'Portuguese', 'Dutch', 'Russian', 'Chinese', 'Japanese', 'Korean')") AnalyzeCmd.Flags().StringVarP(&language, "language", "l", "english", "Languages to use for AI (e.g. 'English', 'Spanish', 'French', 'German', 'Italian', 'Portuguese', 'Dutch', 'Russian', 'Chinese', 'Japanese', 'Korean')")
// add max concurrency
AnalyzeCmd.Flags().IntVarP(&maxConcurrency, "max-concurrency", "m", 10, "Maximum number of concurrent requests to the Kubernetes API server")
} }

View File

@ -20,6 +20,7 @@ import (
"os" "os"
"reflect" "reflect"
"strings" "strings"
"sync"
"github.com/fatih/color" "github.com/fatih/color"
"github.com/k8sgpt-ai/k8sgpt/pkg/ai" "github.com/k8sgpt-ai/k8sgpt/pkg/ai"
@ -40,6 +41,7 @@ type Analysis struct {
Namespace string Namespace string
NoCache bool NoCache bool
Explain bool Explain bool
MaxConcurrency int
} }
type AnalysisStatus string type AnalysisStatus string
@ -55,7 +57,7 @@ type JsonOutput struct {
Results []common.Result `json:"results"` Results []common.Result `json:"results"`
} }
func NewAnalysis(backend string, language string, filters []string, namespace string, noCache bool, explain bool) (*Analysis, error) { func NewAnalysis(backend string, language string, filters []string, namespace string, noCache bool, explain bool, maxConcurrency int) (*Analysis, error) {
var configAI ai.AIConfiguration var configAI ai.AIConfiguration
err := viper.UnmarshalKey("ai", &configAI) err := viper.UnmarshalKey("ai", &configAI)
if err != nil { if err != nil {
@ -106,6 +108,7 @@ func NewAnalysis(backend string, language string, filters []string, namespace st
Namespace: namespace, Namespace: namespace,
NoCache: noCache, NoCache: noCache,
Explain: explain, Explain: explain,
MaxConcurrency: maxConcurrency,
}, nil }, nil
} }
@ -122,45 +125,86 @@ func (a *Analysis) RunAnalysis() []error {
} }
var errorList []error var errorList []error
semaphore := make(chan struct{}, a.MaxConcurrency)
// if there are no filters selected and no active_filters then run all of them // if there are no filters selected and no active_filters then run all of them
if len(a.Filters) == 0 && len(activeFilters) == 0 { if len(a.Filters) == 0 && len(activeFilters) == 0 {
var wg sync.WaitGroup
var mutex sync.Mutex
for _, analyzer := range analyzerMap { for _, analyzer := range analyzerMap {
wg.Add(1)
semaphore <- struct{}{}
go func(analyzer common.IAnalyzer, wg *sync.WaitGroup, semaphore chan struct{}) {
defer wg.Done()
results, err := analyzer.Analyze(analyzerConfig) results, err := analyzer.Analyze(analyzerConfig)
if err != nil { if err != nil {
errorList = append(errorList, errors.New(fmt.Sprintf("[%s] %s", reflect.TypeOf(analyzer).Name(), err))) mutex.Lock()
errorList = append(errorList, fmt.Errorf(fmt.Sprintf("[%s] %s", reflect.TypeOf(analyzer).Name(), err)))
mutex.Unlock()
} }
mutex.Lock()
a.Results = append(a.Results, results...) a.Results = append(a.Results, results...)
mutex.Unlock()
<-semaphore
}(analyzer, &wg, semaphore)
} }
wg.Wait()
return errorList return errorList
} }
semaphore = make(chan struct{}, a.MaxConcurrency)
// if the filters flag is specified // if the filters flag is specified
if len(a.Filters) != 0 { if len(a.Filters) != 0 {
var wg sync.WaitGroup
var mutex sync.Mutex
for _, filter := range a.Filters { for _, filter := range a.Filters {
if analyzer, ok := analyzerMap[filter]; ok { if analyzer, ok := analyzerMap[filter]; ok {
semaphore <- struct{}{}
wg.Add(1)
go func(analyzer common.IAnalyzer) {
defer wg.Done()
results, err := analyzer.Analyze(analyzerConfig) results, err := analyzer.Analyze(analyzerConfig)
if err != nil { if err != nil {
errorList = append(errorList, errors.New(fmt.Sprintf("[%s] %s", filter, err))) mutex.Lock()
errorList = append(errorList, fmt.Errorf(fmt.Sprintf("[%s] %s", filter, err)))
mutex.Unlock()
} }
mutex.Lock()
a.Results = append(a.Results, results...) a.Results = append(a.Results, results...)
mutex.Unlock()
<-semaphore
}(analyzer)
} else { } else {
errorList = append(errorList, errors.New(fmt.Sprintf("\"%s\" filter does not exist. Please run k8sgpt filters list.", filter))) errorList = append(errorList, fmt.Errorf(fmt.Sprintf("\"%s\" filter does not exist. Please run k8sgpt filters list.", filter)))
} }
} }
wg.Wait()
return errorList return errorList
} }
var wg sync.WaitGroup
var mutex sync.Mutex
semaphore = make(chan struct{}, a.MaxConcurrency)
// use active_filters // use active_filters
for _, filter := range activeFilters { for _, filter := range activeFilters {
if analyzer, ok := analyzerMap[filter]; ok { if analyzer, ok := analyzerMap[filter]; ok {
semaphore <- struct{}{}
wg.Add(1)
go func(analyzer common.IAnalyzer) {
defer wg.Done()
results, err := analyzer.Analyze(analyzerConfig) results, err := analyzer.Analyze(analyzerConfig)
if err != nil { if err != nil {
errorList = append(errorList, errors.New(fmt.Sprintf("[%s] %s", filter, err))) mutex.Lock()
errorList = append(errorList, fmt.Errorf("[%s] %s", filter, err))
mutex.Unlock()
} }
mutex.Lock()
a.Results = append(a.Results, results...) a.Results = append(a.Results, results...)
mutex.Unlock()
<-semaphore
}(analyzer)
} }
} }
wg.Wait()
return errorList return errorList
} }

View File

@ -31,6 +31,7 @@ type Config struct {
Key string Key string
Token string Token string
Output string Output string
maxConcurrency int
} }
type Health struct { type Health struct {
@ -55,13 +56,19 @@ func (s *Config) analyzeHandler(w http.ResponseWriter, r *http.Request) {
anonymize := getBoolParam(r.URL.Query().Get("anonymize")) anonymize := getBoolParam(r.URL.Query().Get("anonymize"))
nocache := getBoolParam(r.URL.Query().Get("nocache")) nocache := getBoolParam(r.URL.Query().Get("nocache"))
language := r.URL.Query().Get("language") language := r.URL.Query().Get("language")
var err error
s.maxConcurrency, err = strconv.Atoi(r.URL.Query().Get("maxConcurrency"))
if err != nil {
s.maxConcurrency = 10
}
s.Output = r.URL.Query().Get("output") s.Output = r.URL.Query().Get("output")
if s.Output == "" { if s.Output == "" {
s.Output = "json" s.Output = "json"
} }
config, err := analysis.NewAnalysis(s.Backend, language, []string{}, namespace, nocache, explain) config, err := analysis.NewAnalysis(s.Backend, language, []string{}, namespace, nocache, explain, s.maxConcurrency)
if err != nil { if err != nil {
health.Failure++ health.Failure++
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)