Sync entries in batches just as before (using uploadIntervalSec parameter) (#477)

* Sync entries in batches just as before (using `uploadIntervalSec` parameter)

* Replace `lastTimeSynced` value with `time.Time{}`

Since it will be overwritten by the very first iteration.
This commit is contained in:
M. Mert Yıldıran 2021-11-17 15:16:49 +03:00 committed by GitHub
parent bb85312b9f
commit a13fec3dae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -227,6 +227,10 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
connection.Close() connection.Close()
}() }()
lastTimeSynced := time.Time{}
batch := make([]har.Entry, 0)
handleDataChannel := func(wg *sync.WaitGroup, connection *basenine.Connection, data chan []byte) { handleDataChannel := func(wg *sync.WaitGroup, connection *basenine.Connection, data chan []byte) {
defer wg.Done() defer wg.Done()
for { for {
@ -239,7 +243,6 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
var dataMap map[string]interface{} var dataMap map[string]interface{}
err = json.Unmarshal(dataBytes, &dataMap) err = json.Unmarshal(dataBytes, &dataMap)
result := make([]har.Entry, 0)
var entry tapApi.MizuEntry var entry tapApi.MizuEntry
if err := json.Unmarshal([]byte(dataBytes), &entry); err != nil { if err := json.Unmarshal([]byte(dataBytes), &entry); err != nil {
continue continue
@ -261,14 +264,22 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
continue continue
} }
result = append(result, *harEntry) batch = append(batch, *harEntry)
body, jMarshalErr := json.Marshal(result) now := time.Now()
if lastTimeSynced.Add(time.Duration(uploadIntervalSec) * time.Second).After(now) {
continue
}
lastTimeSynced = now
body, jMarshalErr := json.Marshal(batch)
batchSize := len(batch)
if jMarshalErr != nil { if jMarshalErr != nil {
analyzeInformation.Reset() analyzeInformation.Reset()
logger.Log.Infof("Stopping sync entries") logger.Log.Infof("Stopping sync entries")
logger.Log.Fatal(jMarshalErr) logger.Log.Fatal(jMarshalErr)
} }
batch = make([]har.Entry, 0)
var in bytes.Buffer var in bytes.Buffer
w := zlib.NewWriter(&in) w := zlib.NewWriter(&in)
@ -293,7 +304,7 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
logger.Log.Info("Stopping sync entries") logger.Log.Info("Stopping sync entries")
logger.Log.Fatal(postErr) logger.Log.Fatal(postErr)
} }
analyzeInformation.SentCount += 1 analyzeInformation.SentCount += batchSize
if analyzeInformation.SentCount%SentCountLogInterval == 0 { if analyzeInformation.SentCount%SentCountLogInterval == 0 {
logger.Log.Infof("Uploaded %v entries until now", analyzeInformation.SentCount) logger.Log.Infof("Uploaded %v entries until now", analyzeInformation.SentCount)