Add an exponential backoff for reading log sizes

This commit is contained in:
gmarek 2015-12-29 10:10:17 +01:00
parent b00b41bc7f
commit 2388787c38

View File

@ -83,8 +83,9 @@ type LogsSizeData struct {
// WorkItem is a command for a worker that contains an IP of machine from which we want to // WorkItem is a command for a worker that contains an IP of machine from which we want to
// gather data and paths to all files we're interested in. // gather data and paths to all files we're interested in.
type WorkItem struct { type WorkItem struct {
ip string ip string
paths []string paths []string
backoffMultiplier int
} }
func prepareData(masterAddress string, nodeAddresses []string) LogsSizeData { func prepareData(masterAddress string, nodeAddresses []string) LogsSizeData {
@ -171,13 +172,15 @@ func (v *LogsSizeVerifier) PrintData() string {
// Run starts log size gathering. It starts a gorouting for every worker and then blocks until stopChannel is closed // Run starts log size gathering. It starts a gorouting for every worker and then blocks until stopChannel is closed
func (v *LogsSizeVerifier) Run() { func (v *LogsSizeVerifier) Run() {
v.workChannel <- WorkItem{ v.workChannel <- WorkItem{
ip: v.masterAddress, ip: v.masterAddress,
paths: masterLogsToCheck, paths: masterLogsToCheck,
backoffMultiplier: 1,
} }
for _, node := range v.nodeAddresses { for _, node := range v.nodeAddresses {
v.workChannel <- WorkItem{ v.workChannel <- WorkItem{
ip: node, ip: node,
paths: nodeLogsToCheck, paths: nodeLogsToCheck,
backoffMultiplier: 1,
} }
} }
for _, worker := range v.workers { for _, worker := range v.workers {
@ -212,9 +215,13 @@ func (g *LogSizeGatherer) Work() bool {
) )
if err != nil { if err != nil {
Logf("Error while trying to SSH to %v, skipping probe. Error: %v", workItem.ip, err) Logf("Error while trying to SSH to %v, skipping probe. Error: %v", workItem.ip, err)
if workItem.backoffMultiplier < 128 {
workItem.backoffMultiplier *= 2
}
g.workChannel <- workItem g.workChannel <- workItem
return true return true
} }
workItem.backoffMultiplier = 1
results := strings.Split(sshResult.Stdout, " ") results := strings.Split(sshResult.Stdout, " ")
now := time.Now() now := time.Now()
@ -228,8 +235,12 @@ func (g *LogSizeGatherer) Work() bool {
g.data.AddNewData(workItem.ip, path, now, size) g.data.AddNewData(workItem.ip, path, now, size)
} }
go func() { go func() {
time.Sleep(pollingPeriod) select {
g.workChannel <- workItem case <-time.After(time.Duration(workItem.backoffMultiplier) * pollingPeriod):
g.workChannel <- workItem
case <-g.stopChannel:
return
}
}() }()
return true return true
} }