mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Make locking simpler
add test for node existence cache behavior
This commit is contained in:
parent
dc5383dcf8
commit
6916235513
@ -80,28 +80,35 @@ func (p *PodCache) GetPodStatus(namespace, name string) (*api.PodStatus, error)
|
|||||||
return &value, nil
|
return &value, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// lock must *not* be held
|
func (p *PodCache) nodeExistsInCache(name string) (exists, cacheHit bool) {
|
||||||
func (p *PodCache) nodeExists(name string) bool {
|
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
exists, cacheHit := p.currentNodes[objKey{"", name}]
|
exists, cacheHit = p.currentNodes[objKey{"", name}]
|
||||||
|
return exists, cacheHit
|
||||||
|
}
|
||||||
|
|
||||||
|
// lock must *not* be held
|
||||||
|
func (p *PodCache) nodeExists(name string) bool {
|
||||||
|
exists, cacheHit := p.nodeExistsInCache(name)
|
||||||
if cacheHit {
|
if cacheHit {
|
||||||
return exists
|
return exists
|
||||||
}
|
}
|
||||||
// Don't block everyone while looking up this minion.
|
// TODO: suppose there's N concurrent requests for node "foo"; in that case
|
||||||
// Because this may require an RPC to our storage (e.g. etcd).
|
// it might be useful to block all of them and only look up "foo" once.
|
||||||
func() {
|
// (This code will make up to N lookups.) One way of doing that would be to
|
||||||
p.lock.Unlock()
|
// have a pool of M mutexes and require that before looking up "foo" you must
|
||||||
defer p.lock.Lock()
|
// lock mutex hash("foo") % M.
|
||||||
_, err := p.nodes.Get(name)
|
_, err := p.nodes.Get(name)
|
||||||
exists = true
|
exists = true
|
||||||
if err != nil {
|
if err != nil {
|
||||||
exists = false
|
exists = false
|
||||||
if !errors.IsNotFound(err) {
|
if !errors.IsNotFound(err) {
|
||||||
glog.Errorf("Unexpected error type verifying minion existence: %+v", err)
|
glog.Errorf("Unexpected error type verifying minion existence: %+v", err)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}()
|
}
|
||||||
|
|
||||||
|
p.lock.Lock()
|
||||||
|
defer p.lock.Unlock()
|
||||||
p.currentNodes[objKey{"", name}] = exists
|
p.currentNodes[objKey{"", name}] = exists
|
||||||
return exists
|
return exists
|
||||||
}
|
}
|
||||||
@ -109,23 +116,32 @@ func (p *PodCache) nodeExists(name string) bool {
|
|||||||
// TODO: once Host gets moved to spec, this can take a podSpec + metadata instead of an
|
// TODO: once Host gets moved to spec, this can take a podSpec + metadata instead of an
|
||||||
// entire pod?
|
// entire pod?
|
||||||
func (p *PodCache) updatePodStatus(pod *api.Pod) error {
|
func (p *PodCache) updatePodStatus(pod *api.Pod) error {
|
||||||
|
newStatus, err := p.computePodStatus(pod)
|
||||||
|
|
||||||
|
p.lock.Lock()
|
||||||
|
defer p.lock.Unlock()
|
||||||
|
// Map accesses must be locked.
|
||||||
|
p.podStatus[objKey{pod.Namespace, pod.Name}] = newStatus
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// computePodStatus always returns a new status, even if it also returns a non-nil error.
|
||||||
|
// TODO: once Host gets moved to spec, this can take a podSpec + metadata instead of an
|
||||||
|
// entire pod?
|
||||||
|
func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) {
|
||||||
newStatus := pod.Status
|
newStatus := pod.Status
|
||||||
|
|
||||||
if pod.Status.Host == "" {
|
if pod.Status.Host == "" {
|
||||||
p.lock.Lock()
|
|
||||||
defer p.lock.Unlock()
|
|
||||||
// Not assigned.
|
// Not assigned.
|
||||||
newStatus.Phase = api.PodPending
|
newStatus.Phase = api.PodPending
|
||||||
p.podStatus[objKey{pod.Namespace, pod.Name}] = newStatus
|
return newStatus, nil
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !p.nodeExists(pod.Status.Host) {
|
if !p.nodeExists(pod.Status.Host) {
|
||||||
p.lock.Lock()
|
|
||||||
defer p.lock.Unlock()
|
|
||||||
// Assigned to non-existing node.
|
// Assigned to non-existing node.
|
||||||
newStatus.Phase = api.PodFailed
|
newStatus.Phase = api.PodFailed
|
||||||
p.podStatus[objKey{pod.Namespace, pod.Name}] = newStatus
|
return newStatus, nil
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err := p.containerInfo.GetPodInfo(pod.Status.Host, pod.Namespace, pod.Name)
|
info, err := p.containerInfo.GetPodInfo(pod.Status.Host, pod.Namespace, pod.Name)
|
||||||
@ -142,27 +158,33 @@ func (p *PodCache) updatePodStatus(pod *api.Pod) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return newStatus, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PodCache) resetNodeExistenceCache() {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
p.podStatus[objKey{pod.Namespace, pod.Name}] = newStatus
|
p.currentNodes = map[objKey]bool{}
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateAllContainers updates information about all containers.
|
// UpdateAllContainers updates information about all containers.
|
||||||
|
// Callers should let one call to UpdateAllContainers finish before
|
||||||
|
// calling again, or risk having new info getting clobbered by delayed
|
||||||
|
// old info.
|
||||||
func (p *PodCache) UpdateAllContainers() {
|
func (p *PodCache) UpdateAllContainers() {
|
||||||
func() {
|
p.resetNodeExistenceCache()
|
||||||
// Reset which nodes we think exist
|
|
||||||
p.lock.Lock()
|
|
||||||
defer p.lock.Unlock()
|
|
||||||
p.currentNodes = map[objKey]bool{}
|
|
||||||
}()
|
|
||||||
|
|
||||||
ctx := api.NewContext()
|
ctx := api.NewContext()
|
||||||
pods, err := p.pods.ListPods(ctx, labels.Everything())
|
pods, err := p.pods.ListPods(ctx, labels.Everything())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error synchronizing container list: %v", err)
|
glog.Errorf("Error getting pod list: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: this algorithm is 1 goroutine & RPC per pod. With a little work,
|
||||||
|
// it should be possible to make it 1 per *node*, which will be important
|
||||||
|
// at very large scales. (To be clear, the goroutines shouldn't matter--
|
||||||
|
// it's the RPCs that need to be minimized.)
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for i := range pods.Items {
|
for i := range pods.Items {
|
||||||
pod := &pods.Items[i]
|
pod := &pods.Items[i]
|
||||||
@ -171,7 +193,7 @@ func (p *PodCache) UpdateAllContainers() {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
err := p.updatePodStatus(pod)
|
err := p.updatePodStatus(pod)
|
||||||
if err != nil && err != client.ErrPodInfoNotAvailable {
|
if err != nil && err != client.ErrPodInfoNotAvailable {
|
||||||
glog.Errorf("Error synchronizing container: %v", err)
|
glog.Errorf("Error getting info for pod %v/%v: %v", pod.Namespace, pod.Name, err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@ package master
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -27,19 +28,48 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type FakePodInfoGetter struct {
|
type podInfoCall struct {
|
||||||
host string
|
host string
|
||||||
id string
|
|
||||||
namespace string
|
namespace string
|
||||||
data api.PodContainerInfo
|
name string
|
||||||
err error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FakePodInfoGetter) GetPodInfo(host, namespace, id string) (api.PodContainerInfo, error) {
|
type podInfoResponse struct {
|
||||||
f.host = host
|
useCount int
|
||||||
f.id = id
|
data api.PodContainerInfo
|
||||||
f.namespace = namespace
|
err error
|
||||||
return f.data, f.err
|
}
|
||||||
|
|
||||||
|
type podInfoCalls map[podInfoCall]*podInfoResponse
|
||||||
|
|
||||||
|
type FakePodInfoGetter struct {
|
||||||
|
calls podInfoCalls
|
||||||
|
lock sync.Mutex
|
||||||
|
|
||||||
|
// default data/error to return, or you can add
|
||||||
|
// responses to specific calls-- that will take precedence.
|
||||||
|
data api.PodContainerInfo
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *FakePodInfoGetter) GetPodInfo(host, namespace, name string) (api.PodContainerInfo, error) {
|
||||||
|
f.lock.Lock()
|
||||||
|
defer f.lock.Unlock()
|
||||||
|
|
||||||
|
if f.calls == nil {
|
||||||
|
f.calls = podInfoCalls{}
|
||||||
|
}
|
||||||
|
|
||||||
|
key := podInfoCall{host, namespace, name}
|
||||||
|
call, ok := f.calls[key]
|
||||||
|
if !ok {
|
||||||
|
f.calls[key] = &podInfoResponse{
|
||||||
|
0, f.data, f.err,
|
||||||
|
}
|
||||||
|
call = f.calls[key]
|
||||||
|
}
|
||||||
|
call.useCount++
|
||||||
|
return call.data, call.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPodCacheGetDifferentNamespace(t *testing.T) {
|
func TestPodCacheGetDifferentNamespace(t *testing.T) {
|
||||||
@ -171,6 +201,7 @@ func makeNode(name string) *api.Node {
|
|||||||
|
|
||||||
func TestPodUpdateAllContainers(t *testing.T) {
|
func TestPodUpdateAllContainers(t *testing.T) {
|
||||||
pod := makePod(api.NamespaceDefault, "foo", "machine", "bar")
|
pod := makePod(api.NamespaceDefault, "foo", "machine", "bar")
|
||||||
|
pod2 := makePod(api.NamespaceDefault, "baz", "machine", "qux")
|
||||||
config := podCacheTestConfig{
|
config := podCacheTestConfig{
|
||||||
ipFunc: func(host string) string {
|
ipFunc: func(host string) string {
|
||||||
if host == "machine" {
|
if host == "machine" {
|
||||||
@ -180,15 +211,22 @@ func TestPodUpdateAllContainers(t *testing.T) {
|
|||||||
},
|
},
|
||||||
kubeletContainerInfo: api.PodInfo{"bar": api.ContainerStatus{}},
|
kubeletContainerInfo: api.PodInfo{"bar": api.ContainerStatus{}},
|
||||||
nodes: []api.Node{*makeNode("machine")},
|
nodes: []api.Node{*makeNode("machine")},
|
||||||
pods: []api.Pod{*pod},
|
pods: []api.Pod{*pod, *pod2},
|
||||||
}
|
}
|
||||||
cache := config.Construct()
|
cache := config.Construct()
|
||||||
|
|
||||||
cache.UpdateAllContainers()
|
cache.UpdateAllContainers()
|
||||||
|
|
||||||
fake := config.fakePodInfo
|
call1 := config.fakePodInfo.calls[podInfoCall{"machine", api.NamespaceDefault, "foo"}]
|
||||||
if fake.host != "machine" || fake.id != "foo" || fake.namespace != api.NamespaceDefault {
|
call2 := config.fakePodInfo.calls[podInfoCall{"machine", api.NamespaceDefault, "baz"}]
|
||||||
t.Errorf("Unexpected access: %+v", fake)
|
if call1 == nil || call1.useCount != 1 {
|
||||||
|
t.Errorf("Expected 1 call for 'foo': %+v", config.fakePodInfo.calls)
|
||||||
|
}
|
||||||
|
if call2 == nil || call2.useCount != 1 {
|
||||||
|
t.Errorf("Expected 1 call for 'baz': %+v", config.fakePodInfo.calls)
|
||||||
|
}
|
||||||
|
if len(config.fakePodInfo.calls) != 2 {
|
||||||
|
t.Errorf("Expected 2 calls: %+v", config.fakePodInfo.calls)
|
||||||
}
|
}
|
||||||
|
|
||||||
status, err := cache.GetPodStatus(api.NamespaceDefault, "foo")
|
status, err := cache.GetPodStatus(api.NamespaceDefault, "foo")
|
||||||
@ -201,6 +239,14 @@ func TestPodUpdateAllContainers(t *testing.T) {
|
|||||||
if e, a := "1.2.3.5", status.HostIP; e != a {
|
if e, a := "1.2.3.5", status.HostIP; e != a {
|
||||||
t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", e, a)
|
t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", e, a)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if e, a := 1, len(config.fakeNodes.Actions); e != a {
|
||||||
|
t.Errorf("Expected: %v, Got: %v; %+v", e, a, config.fakeNodes.Actions)
|
||||||
|
} else {
|
||||||
|
if e, a := "get-minion", config.fakeNodes.Actions[0].Action; e != a {
|
||||||
|
t.Errorf("Expected: %v, Got: %v; %+v", e, a, config.fakeNodes.Actions)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFillPodStatusNoHost(t *testing.T) {
|
func TestFillPodStatusNoHost(t *testing.T) {
|
||||||
|
Loading…
Reference in New Issue
Block a user