diff --git a/agent/pkg/up9/main.go b/agent/pkg/up9/main.go index 225d3f60d..9f0a1d185 100644 --- a/agent/pkg/up9/main.go +++ b/agent/pkg/up9/main.go @@ -227,6 +227,10 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva connection.Close() }() + lastTimeSynced := time.Time{} + + batch := make([]har.Entry, 0) + handleDataChannel := func(wg *sync.WaitGroup, connection *basenine.Connection, data chan []byte) { defer wg.Done() for { @@ -239,7 +243,6 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva var dataMap map[string]interface{} err = json.Unmarshal(dataBytes, &dataMap) - result := make([]har.Entry, 0) var entry tapApi.MizuEntry if err := json.Unmarshal([]byte(dataBytes), &entry); err != nil { continue @@ -261,14 +264,22 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva continue } - result = append(result, *harEntry) + batch = append(batch, *harEntry) - body, jMarshalErr := json.Marshal(result) + now := time.Now() + if lastTimeSynced.Add(time.Duration(uploadIntervalSec) * time.Second).After(now) { + continue + } + lastTimeSynced = now + + body, jMarshalErr := json.Marshal(batch) + batchSize := len(batch) if jMarshalErr != nil { analyzeInformation.Reset() logger.Log.Infof("Stopping sync entries") logger.Log.Fatal(jMarshalErr) } + batch = make([]har.Entry, 0) var in bytes.Buffer w := zlib.NewWriter(&in) @@ -293,7 +304,7 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva logger.Log.Info("Stopping sync entries") logger.Log.Fatal(postErr) } - analyzeInformation.SentCount += 1 + analyzeInformation.SentCount += batchSize if analyzeInformation.SentCount%SentCountLogInterval == 0 { logger.Log.Infof("Uploaded %v entries until now", analyzeInformation.SentCount)