mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 09:22:44 +00:00
Merge pull request #3355 from saad-ali/fix2495
Fix "Kubelet doesn't kill old pods when BoundPods is empty" issue
This commit is contained in:
commit
c0c7ffb797
@ -59,7 +59,16 @@ func NewSourceEtcd(key string, client tools.EtcdClient, updates chan<- interface
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *sourceEtcd) run() {
|
func (s *sourceEtcd) run() {
|
||||||
watching := s.helper.Watch(s.key, 0)
|
boundPods := api.BoundPods{}
|
||||||
|
err := s.helper.ExtractToList(s.key, &boundPods)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("etcd failed to retrieve the value for the key %q. Error: %v", s.key, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Push update. Maybe an empty PodList to allow EtcdSource to be marked as seen
|
||||||
|
s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET, kubelet.EtcdSource}
|
||||||
|
index, _ := s.helper.ResourceVersioner.ResourceVersion(&boundPods)
|
||||||
|
watching := s.helper.Watch(s.key, index)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case event, ok := <-watching.ResultChan():
|
case event, ok := <-watching.ResultChan():
|
||||||
@ -87,6 +96,9 @@ func (s *sourceEtcd) run() {
|
|||||||
// It returns a list of containers, or an error if one occurs.
|
// It returns a list of containers, or an error if one occurs.
|
||||||
func eventToPods(ev watch.Event) ([]api.BoundPod, error) {
|
func eventToPods(ev watch.Event) ([]api.BoundPod, error) {
|
||||||
pods := []api.BoundPod{}
|
pods := []api.BoundPod{}
|
||||||
|
if ev.Object == nil {
|
||||||
|
return pods, nil
|
||||||
|
}
|
||||||
boundPods, ok := ev.Object.(*api.BoundPods)
|
boundPods, ok := ev.Object.(*api.BoundPods)
|
||||||
if !ok {
|
if !ok {
|
||||||
return pods, errors.New("unable to parse response as BoundPods")
|
return pods, errors.New("unable to parse response as BoundPods")
|
||||||
|
@ -33,7 +33,7 @@ func TestEventToPods(t *testing.T) {
|
|||||||
{
|
{
|
||||||
input: watch.Event{Object: nil},
|
input: watch.Event{Object: nil},
|
||||||
pods: []api.BoundPod{},
|
pods: []api.BoundPod{},
|
||||||
fail: true,
|
fail: false,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
input: watch.Event{Object: &api.BoundPods{}},
|
input: watch.Event{Object: &api.BoundPods{}},
|
||||||
|
@ -64,6 +64,8 @@ func (s *sourceFile) extractFromPath() error {
|
|||||||
if !os.IsNotExist(err) {
|
if !os.IsNotExist(err) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// Emit an update with an empty PodList to allow FileSource to be marked as seen
|
||||||
|
s.updates <- kubelet.PodUpdate{[]api.BoundPod{}, kubelet.SET, kubelet.FileSource}
|
||||||
return fmt.Errorf("path does not exist, ignoring")
|
return fmt.Errorf("path does not exist, ignoring")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,8 +92,14 @@ func TestUpdateOnNonExistentFile(t *testing.T) {
|
|||||||
NewSourceFile("random_non_existent_path", time.Millisecond, ch)
|
NewSourceFile("random_non_existent_path", time.Millisecond, ch)
|
||||||
select {
|
select {
|
||||||
case got := <-ch:
|
case got := <-ch:
|
||||||
t.Errorf("Expected no update, Got %#v", got)
|
update := got.(kubelet.PodUpdate)
|
||||||
|
expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource)
|
||||||
|
if !api.Semantic.DeepEqual(expected, update) {
|
||||||
|
t.Fatalf("Expected %#v, Got %#v", expected, update)
|
||||||
|
}
|
||||||
|
|
||||||
case <-time.After(2 * time.Millisecond):
|
case <-time.After(2 * time.Millisecond):
|
||||||
|
t.Errorf("Expected update, timeout instead")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,6 +69,8 @@ func (s *sourceURL) extractFromURL() error {
|
|||||||
return fmt.Errorf("%v: %v", s.url, resp.Status)
|
return fmt.Errorf("%v: %v", s.url, resp.Status)
|
||||||
}
|
}
|
||||||
if len(data) == 0 {
|
if len(data) == 0 {
|
||||||
|
// Emit an update with an empty PodList to allow HTTPSource to be marked as seen
|
||||||
|
s.updates <- kubelet.PodUpdate{[]api.BoundPod{}, kubelet.SET, kubelet.HTTPSource}
|
||||||
return fmt.Errorf("zero-length data received from %v", s.url)
|
return fmt.Errorf("zero-length data received from %v", s.url)
|
||||||
}
|
}
|
||||||
// Short circuit if the manifest has not changed since the last time it was read.
|
// Short circuit if the manifest has not changed since the last time it was read.
|
||||||
|
@ -1118,9 +1118,6 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
|
|||||||
}
|
}
|
||||||
case <-time.After(kl.resyncInterval):
|
case <-time.After(kl.resyncInterval):
|
||||||
glog.V(4).Infof("Periodic sync")
|
glog.V(4).Infof("Periodic sync")
|
||||||
if kl.pods == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err := handler.SyncPods(kl.pods)
|
err := handler.SyncPods(kl.pods)
|
||||||
|
Loading…
Reference in New Issue
Block a user