mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Transform the podCache into a write-through cache.
Don't always clear podInfo, instead occasionally garbage collect.
This commit is contained in:
parent
ca6de16df7
commit
3624b65f1c
@ -352,6 +352,7 @@ func (m *Master) init(c *Config) {
|
|||||||
m.podRegistry,
|
m.podRegistry,
|
||||||
)
|
)
|
||||||
go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30)
|
go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30)
|
||||||
|
go util.Forever(func() { podCache.GarbageCollectPodStatus() }, time.Minute*30)
|
||||||
|
|
||||||
// TODO: Factor out the core API registration
|
// TODO: Factor out the core API registration
|
||||||
m.storage = map[string]apiserver.RESTStorage{
|
m.storage = map[string]apiserver.RESTStorage{
|
||||||
|
@ -69,14 +69,38 @@ func NewPodCache(ipCache IPGetter, info client.PodInfoGetter, nodes client.NodeI
|
|||||||
|
|
||||||
// GetPodStatus gets the stored pod status.
|
// GetPodStatus gets the stored pod status.
|
||||||
func (p *PodCache) GetPodStatus(namespace, name string) (*api.PodStatus, error) {
|
func (p *PodCache) GetPodStatus(namespace, name string) (*api.PodStatus, error) {
|
||||||
|
status := p.getPodStatusInternal(namespace, name)
|
||||||
|
if status != nil {
|
||||||
|
return status, nil
|
||||||
|
}
|
||||||
|
return p.updateCacheAndReturn(namespace, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PodCache) updateCacheAndReturn(namespace, name string) (*api.PodStatus, error) {
|
||||||
|
pod, err := p.pods.GetPod(api.WithNamespace(api.NewContext(), namespace), name)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := p.updatePodStatus(pod); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
status := p.getPodStatusInternal(namespace, name)
|
||||||
|
if status == nil {
|
||||||
|
glog.Warningf("nil status after successful update. that's odd... (%s %s)", namespace, name)
|
||||||
|
return nil, client.ErrPodInfoNotAvailable
|
||||||
|
}
|
||||||
|
return status, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PodCache) getPodStatusInternal(namespace, name string) *api.PodStatus {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
value, ok := p.podStatus[objKey{namespace, name}]
|
value, ok := p.podStatus[objKey{namespace, name}]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, client.ErrPodInfoNotAvailable
|
return nil
|
||||||
}
|
}
|
||||||
// Make a copy
|
// Make a copy
|
||||||
return &value, nil
|
return &value
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PodCache) ClearPodStatus(namespace, name string) {
|
func (p *PodCache) ClearPodStatus(namespace, name string) {
|
||||||
@ -178,10 +202,23 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) {
|
|||||||
return newStatus, err
|
return newStatus, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PodCache) resetNodeStatusCache() {
|
func (p *PodCache) GarbageCollectPodStatus() {
|
||||||
|
pods, err := p.pods.ListPods(api.NewContext(), labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error getting pod list: %v", err)
|
||||||
|
}
|
||||||
|
keys := map[objKey]bool{}
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
keys[objKey{pod.Namespace, pod.Name}] = true
|
||||||
|
}
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
p.currentNodes = map[objKey]api.NodeStatus{}
|
for key := range p.podStatus {
|
||||||
|
if _, found := keys[key]; !found {
|
||||||
|
glog.Infof("Deleting orphaned cache entry: %v", key)
|
||||||
|
delete(p.podStatus, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateAllContainers updates information about all containers.
|
// UpdateAllContainers updates information about all containers.
|
||||||
@ -189,8 +226,6 @@ func (p *PodCache) resetNodeStatusCache() {
|
|||||||
// calling again, or risk having new info getting clobbered by delayed
|
// calling again, or risk having new info getting clobbered by delayed
|
||||||
// old info.
|
// old info.
|
||||||
func (p *PodCache) UpdateAllContainers() {
|
func (p *PodCache) UpdateAllContainers() {
|
||||||
p.resetNodeStatusCache()
|
|
||||||
|
|
||||||
ctx := api.NewContext()
|
ctx := api.NewContext()
|
||||||
pods, err := p.pods.ListPods(ctx, labels.Everything())
|
pods, err := p.pods.ListPods(ctx, labels.Everything())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -126,7 +126,10 @@ func TestPodCacheGet(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPodCacheDelete(t *testing.T) {
|
func TestPodCacheDelete(t *testing.T) {
|
||||||
cache := NewPodCache(nil, nil, nil, nil)
|
config := podCacheTestConfig{
|
||||||
|
err: client.ErrPodInfoNotAvailable,
|
||||||
|
}
|
||||||
|
cache := config.Construct()
|
||||||
|
|
||||||
expected := api.PodStatus{
|
expected := api.PodStatus{
|
||||||
Info: api.PodInfo{
|
Info: api.PodInfo{
|
||||||
@ -156,14 +159,38 @@ func TestPodCacheDelete(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPodCacheGetMissing(t *testing.T) {
|
func TestPodCacheGetMissing(t *testing.T) {
|
||||||
cache := NewPodCache(nil, nil, nil, nil)
|
pod1 := makePod(api.NamespaceDefault, "foo", "machine", "bar")
|
||||||
|
config := podCacheTestConfig{
|
||||||
|
ipFunc: func(host string) string {
|
||||||
|
if host == "machine" {
|
||||||
|
return "1.2.3.5"
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
},
|
||||||
|
kubeletContainerInfo: api.PodStatus{
|
||||||
|
Info: api.PodInfo{"bar": api.ContainerStatus{}}},
|
||||||
|
nodes: []api.Node{*makeHealthyNode("machine")},
|
||||||
|
pod: pod1,
|
||||||
|
}
|
||||||
|
cache := config.Construct()
|
||||||
|
|
||||||
status, err := cache.GetPodStatus(api.NamespaceDefault, "foo")
|
status, err := cache.GetPodStatus(api.NamespaceDefault, "foo")
|
||||||
if err == nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected non-error: %+v", err)
|
t.Errorf("Unexpected error: %+v", err)
|
||||||
}
|
}
|
||||||
if status != nil {
|
if status == nil {
|
||||||
t.Errorf("Unexpected status: %+v", status)
|
t.Errorf("Unexpected non-status.")
|
||||||
|
}
|
||||||
|
expected := &api.PodStatus{
|
||||||
|
Phase: "Pending",
|
||||||
|
Host: "machine",
|
||||||
|
HostIP: "1.2.3.5",
|
||||||
|
Info: api.PodInfo{
|
||||||
|
"bar": api.ContainerStatus{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(status, expected) {
|
||||||
|
t.Errorf("expected:\n%#v\ngot:\n%#v\n", expected, status)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -177,6 +204,8 @@ type podCacheTestConfig struct {
|
|||||||
ipFunc func(string) string // Construct will set a default if nil
|
ipFunc func(string) string // Construct will set a default if nil
|
||||||
nodes []api.Node
|
nodes []api.Node
|
||||||
pods []api.Pod
|
pods []api.Pod
|
||||||
|
pod *api.Pod
|
||||||
|
err error
|
||||||
kubeletContainerInfo api.PodStatus
|
kubeletContainerInfo api.PodStatus
|
||||||
|
|
||||||
// Construct will fill in these fields
|
// Construct will fill in these fields
|
||||||
@ -202,6 +231,8 @@ func (c *podCacheTestConfig) Construct() *PodCache {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
c.fakePods = registrytest.NewPodRegistry(&api.PodList{Items: c.pods})
|
c.fakePods = registrytest.NewPodRegistry(&api.PodList{Items: c.pods})
|
||||||
|
c.fakePods.Pod = c.pod
|
||||||
|
c.fakePods.Err = c.err
|
||||||
return NewPodCache(
|
return NewPodCache(
|
||||||
fakeIPCache(c.ipFunc),
|
fakeIPCache(c.ipFunc),
|
||||||
c.fakePodInfo,
|
c.fakePodInfo,
|
||||||
@ -829,3 +860,26 @@ func TestPodPhaseWithRestartOnFailure(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGarbageCollection(t *testing.T) {
|
||||||
|
pod1 := makePod(api.NamespaceDefault, "foo", "machine", "bar")
|
||||||
|
pod2 := makePod(api.NamespaceDefault, "baz", "machine", "qux")
|
||||||
|
config := podCacheTestConfig{
|
||||||
|
pods: []api.Pod{*pod1, *pod2},
|
||||||
|
}
|
||||||
|
cache := config.Construct()
|
||||||
|
|
||||||
|
expected := api.PodStatus{
|
||||||
|
Info: api.PodInfo{
|
||||||
|
"extra": api.ContainerStatus{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
cache.podStatus[objKey{api.NamespaceDefault, "extra"}] = expected
|
||||||
|
|
||||||
|
cache.GarbageCollectPodStatus()
|
||||||
|
|
||||||
|
status, found := cache.podStatus[objKey{api.NamespaceDefault, "extra"}]
|
||||||
|
if found {
|
||||||
|
t.Errorf("unexpectedly found: %v for key %v", status, objKey{api.NamespaceDefault, "extra"})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user