fixed crash when channel is closed (#273)

This commit is contained in:
RoyUP9 2021-09-14 11:53:47 +03:00 committed by GitHub
parent 7dca1ad889
commit d5b01347df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -379,13 +379,28 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
for { for {
select { select {
case pod := <-added: case pod, ok := <-added:
if !ok {
added = nil
continue
}
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace) logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn() restartTappersDebouncer.SetOn()
case pod := <-removed: case pod, ok := <-removed:
if !ok {
removed = nil
continue
}
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace) logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn() restartTappersDebouncer.SetOn()
case pod := <-modified: case pod, ok := <-modified:
if !ok {
modified = nil
continue
}
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP) logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address. // Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events: // After filtering for IPs, on a normal pod restart this includes the following events:
@ -396,8 +411,12 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
if pod.Status.PodIP != "" { if pod.Status.PodIP != "" {
restartTappersDebouncer.SetOn() restartTappersDebouncer.SetOn()
} }
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
case err := <-errorChan:
logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err) logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
restartTappersDebouncer.Cancel() restartTappersDebouncer.Cancel()
// TODO: Does this also perform cleanup? // TODO: Does this also perform cleanup?
@ -477,21 +496,28 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
timeAfter := time.After(25 * time.Second) timeAfter := time.After(25 * time.Second)
for { for {
select { select {
case <-ctx.Done(): case _, ok := <-added:
logger.Log.Debugf("Watching API Server pod loop, ctx done") if !ok {
return added = nil
case <-added: continue
}
logger.Log.Debugf("Watching API Server pod loop, added") logger.Log.Debugf("Watching API Server pod loop, added")
continue case _, ok := <-removed:
case <-removed: if !ok {
removed = nil
continue
}
logger.Log.Infof("%s removed", mizu.ApiServerPodName) logger.Log.Infof("%s removed", mizu.ApiServerPodName)
cancel() cancel()
return return
case modifiedPod := <-modified: case modifiedPod, ok := <-modified:
if modifiedPod == nil { if !ok {
logger.Log.Debugf("Watching API Server pod loop, modifiedPod with nil") modified = nil
continue continue
} }
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase) logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady { if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true isPodReady = true
@ -510,14 +536,23 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
logger.Log.Debugf("[Error] failed update tapped pods %v", err) logger.Log.Debugf("[Error] failed update tapped pods %v", err)
} }
} }
case _, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace)
cancel()
case <-timeAfter: case <-timeAfter:
if !isPodReady { if !isPodReady {
logger.Log.Errorf(uiUtils.Error, "Mizu API server was not ready in time") logger.Log.Errorf(uiUtils.Error, "Mizu API server was not ready in time")
cancel() cancel()
} }
case <-errorChan: case <-ctx.Done():
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace) logger.Log.Debugf("Watching API Server pod loop, ctx done")
cancel() return
} }
} }
} }