mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Support populating the runtime cache in PLEG
This changes does not turn on this feature (cache) for kubelet.
This commit is contained in:
parent
032c0a4074
commit
b56ed1a8c2
@ -367,7 +367,7 @@ func NewMainKubelet(
|
||||
serializeImagePulls,
|
||||
)
|
||||
|
||||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod)
|
||||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil)
|
||||
case "rkt":
|
||||
conf := &rkt.Config{
|
||||
Path: rktPath,
|
||||
@ -388,7 +388,7 @@ func NewMainKubelet(
|
||||
return nil, err
|
||||
}
|
||||
klet.containerRuntime = rktRuntime
|
||||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod)
|
||||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil)
|
||||
|
||||
// No Docker daemon to put in a container.
|
||||
dockerDaemonContainer = ""
|
||||
|
@ -174,7 +174,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
||||
kubelet.resyncInterval = 10 * time.Second
|
||||
kubelet.workQueue = queue.NewBasicWorkQueue()
|
||||
// Relist period does not affect the tests.
|
||||
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour)
|
||||
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil)
|
||||
kubelet.clock = fakeClock
|
||||
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock}
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
)
|
||||
|
||||
// GenericPLEG is an extremely simple generic PLEG that relies solely on
|
||||
@ -53,10 +54,12 @@ type GenericPLEG struct {
|
||||
podRecords podRecords
|
||||
// Time of the last relisting.
|
||||
lastRelistTime time.Time
|
||||
// Cache for storing the runtime states required for syncing pods.
|
||||
cache kubecontainer.Cache
|
||||
}
|
||||
|
||||
// plegContainerState has an one-to-one mapping to the
|
||||
// kubecontainer.ContainerState except for the Non-existent state. This state
|
||||
// plegContainerState has a one-to-one mapping to the
|
||||
// kubecontainer.ContainerState except for the non-existent state. This state
|
||||
// is introduced here to complete the state transition scenarios.
|
||||
type plegContainerState string
|
||||
|
||||
@ -88,12 +91,13 @@ type podRecord struct {
|
||||
type podRecords map[types.UID]*podRecord
|
||||
|
||||
func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
|
||||
relistPeriod time.Duration) PodLifecycleEventGenerator {
|
||||
relistPeriod time.Duration, cache kubecontainer.Cache) PodLifecycleEventGenerator {
|
||||
return &GenericPLEG{
|
||||
relistPeriod: relistPeriod,
|
||||
runtime: runtime,
|
||||
eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
|
||||
podRecords: make(podRecords),
|
||||
cache: cache,
|
||||
}
|
||||
}
|
||||
|
||||
@ -161,64 +165,107 @@ func (g *GenericPLEG) relist() {
|
||||
return
|
||||
}
|
||||
pods := kubecontainer.Pods(podList)
|
||||
|
||||
eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
|
||||
// Process all currently visible pods.
|
||||
for _, pod := range pods {
|
||||
g.podRecords.setCurrent(pod)
|
||||
// Locate the old pod.
|
||||
oldPod := g.podRecords.getOld(pod.ID)
|
||||
}
|
||||
|
||||
// Process all currently visible containers in the pod.
|
||||
for _, container := range pod.Containers {
|
||||
cid := container.ID
|
||||
oldState := getContainerState(oldPod, cid)
|
||||
newState := convertState(container.State)
|
||||
e := generateEvent(pod.ID, cid.ID, oldState, newState)
|
||||
// Compare the old and the current pods, and generate events.
|
||||
eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
|
||||
for pid := range g.podRecords {
|
||||
oldPod := g.podRecords.getOld(pid)
|
||||
pod := g.podRecords.getCurrent(pid)
|
||||
// Get all containers in the old and the new pod.
|
||||
allContainers := getContainersFromPods(oldPod, pod)
|
||||
for _, container := range allContainers {
|
||||
e := computeEvent(oldPod, pod, &container.ID)
|
||||
updateEvents(eventsByPodID, e)
|
||||
}
|
||||
}
|
||||
|
||||
if oldPod == nil {
|
||||
continue
|
||||
}
|
||||
// Process all containers in the old pod, but no longer in the new pod.
|
||||
for _, oldContainer := range oldPod.Containers {
|
||||
cid := oldContainer.ID
|
||||
oldState := convertState(oldContainer.State)
|
||||
newState := getContainerState(pod, cid)
|
||||
if newState != plegContainerNonExistent {
|
||||
// We already processed the container.
|
||||
// If there are events associated with a pod, we should update the
|
||||
// podCache.
|
||||
for pid, events := range eventsByPodID {
|
||||
pod := g.podRecords.getCurrent(pid)
|
||||
if g.cacheEnabled() {
|
||||
// updateCache() will inspect the pod and update the cache. If an
|
||||
// error occurs during the inspection, we want PLEG to retry again
|
||||
// in the next relist. To achieve this, we do not update the
|
||||
// associated podRecord of the pod, so that the change will be
|
||||
// detect again in the next relist.
|
||||
// TODO: If many pods changed during the same relist period,
|
||||
// inspecting the pod and getting the PodStatus to update the cache
|
||||
// serially may take a while. We should be aware of this and
|
||||
// parallelize if needed.
|
||||
if err := g.updateCache(pod, pid); err != nil {
|
||||
glog.Errorf("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err)
|
||||
continue
|
||||
}
|
||||
// Container no longer visible, generate an event.
|
||||
e := generateEvent(pod.ID, cid.ID, oldState, plegContainerNonExistent)
|
||||
updateEvents(eventsByPodID, e)
|
||||
}
|
||||
}
|
||||
|
||||
// Process all pods that are no longer visible.
|
||||
for pid := range g.podRecords {
|
||||
if pod := g.podRecords.getCurrent(pid); pod != nil {
|
||||
continue
|
||||
}
|
||||
oldPod := g.podRecords.getOld(pid)
|
||||
for _, oldContainer := range oldPod.Containers {
|
||||
cid := oldContainer.ID
|
||||
oldState := convertState(oldContainer.State)
|
||||
e := generateEvent(oldPod.ID, cid.ID, oldState, plegContainerNonExistent)
|
||||
updateEvents(eventsByPodID, e)
|
||||
}
|
||||
}
|
||||
|
||||
// Update the internal storage.
|
||||
g.podRecords.updateAll()
|
||||
|
||||
// Send out the events.
|
||||
for _, events := range eventsByPodID {
|
||||
// Update the internal storage and send out the events.
|
||||
g.podRecords.update(pid)
|
||||
for i := range events {
|
||||
g.eventChannel <- events[i]
|
||||
}
|
||||
}
|
||||
|
||||
if g.cacheEnabled() {
|
||||
// Update the cache timestamp. This needs to happen *after*
|
||||
// all pods have been properly updated in the cache.
|
||||
g.cache.UpdateTime(timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Container {
|
||||
cidSet := sets.NewString()
|
||||
var containers []*kubecontainer.Container
|
||||
for _, p := range pods {
|
||||
if p == nil {
|
||||
continue
|
||||
}
|
||||
for _, c := range p.Containers {
|
||||
cid := string(c.ID.ID)
|
||||
if cidSet.Has(cid) {
|
||||
continue
|
||||
}
|
||||
cidSet.Insert(cid)
|
||||
containers = append(containers, c)
|
||||
}
|
||||
}
|
||||
return containers
|
||||
}
|
||||
|
||||
func computeEvent(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) *PodLifecycleEvent {
|
||||
var pid types.UID
|
||||
if oldPod != nil {
|
||||
pid = oldPod.ID
|
||||
} else if newPod != nil {
|
||||
pid = newPod.ID
|
||||
}
|
||||
oldState := getContainerState(oldPod, cid)
|
||||
newState := getContainerState(newPod, cid)
|
||||
return generateEvent(pid, cid.ID, oldState, newState)
|
||||
}
|
||||
|
||||
func (g *GenericPLEG) cacheEnabled() bool {
|
||||
return g.cache != nil
|
||||
}
|
||||
|
||||
func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error {
|
||||
if pod == nil {
|
||||
// The pod is missing in the current relist. This means that
|
||||
// the pod has no visible (active or inactive) containers.
|
||||
glog.V(8).Infof("PLEG: Delete status for pod %q", string(pid))
|
||||
g.cache.Delete(pid)
|
||||
return nil
|
||||
}
|
||||
timestamp := time.Now()
|
||||
// TODO: Consider adding a new runtime method
|
||||
// GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing
|
||||
// all containers again.
|
||||
status, err := g.runtime.GetPodStatus(pod.ID, pod.Name, pod.Namespace)
|
||||
glog.V(8).Infof("PLEG: Write status for %s/%s: %+v (err: %v)", pod.Name, pod.Namespace, status, err)
|
||||
g.cache.Set(pod.ID, status, err, timestamp)
|
||||
return err
|
||||
}
|
||||
|
||||
func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) {
|
||||
@ -228,13 +275,13 @@ func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecy
|
||||
eventsByPodID[e.ID] = append(eventsByPodID[e.ID], e)
|
||||
}
|
||||
|
||||
func getContainerState(pod *kubecontainer.Pod, cid kubecontainer.ContainerID) plegContainerState {
|
||||
func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) plegContainerState {
|
||||
// Default to the non-existent state.
|
||||
state := plegContainerNonExistent
|
||||
if pod == nil {
|
||||
return state
|
||||
}
|
||||
container := pod.FindContainerByID(cid)
|
||||
container := pod.FindContainerByID(*cid)
|
||||
if container == nil {
|
||||
return state
|
||||
}
|
||||
@ -265,14 +312,20 @@ func (pr podRecords) setCurrent(pod *kubecontainer.Pod) {
|
||||
pr[pod.ID] = &podRecord{current: pod}
|
||||
}
|
||||
|
||||
func (pr podRecords) updateAll() {
|
||||
for k, r := range pr {
|
||||
if r.current == nil {
|
||||
// Pod no longer exists; delete the entry.
|
||||
delete(pr, k)
|
||||
continue
|
||||
}
|
||||
r.old = r.current
|
||||
r.current = nil
|
||||
func (pr podRecords) update(id types.UID) {
|
||||
r, ok := pr[id]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
pr.updateInternal(id, r)
|
||||
}
|
||||
|
||||
func (pr podRecords) updateInternal(id types.UID, r *podRecord) {
|
||||
if r.current == nil {
|
||||
// Pod no longer exists; delete the entry.
|
||||
delete(pr, id)
|
||||
return
|
||||
}
|
||||
r.old = r.current
|
||||
r.current = nil
|
||||
}
|
||||
|
@ -17,12 +17,15 @@ limitations under the License.
|
||||
package pleg
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
@ -209,3 +212,108 @@ func TestReportMissingPods(t *testing.T) {
|
||||
actual := getEventsFromChannel(ch)
|
||||
verifyEvents(t, expected, actual)
|
||||
}
|
||||
|
||||
func newTestGenericPLEGWithRuntimeMock() (*GenericPLEG, *kubecontainer.Mock) {
|
||||
runtimeMock := &kubecontainer.Mock{}
|
||||
pleg := &GenericPLEG{
|
||||
relistPeriod: time.Hour,
|
||||
runtime: runtimeMock,
|
||||
eventChannel: make(chan *PodLifecycleEvent, 100),
|
||||
podRecords: make(podRecords),
|
||||
cache: kubecontainer.NewCache(),
|
||||
}
|
||||
return pleg, runtimeMock
|
||||
}
|
||||
|
||||
func createTestPodsStatusesAndEvents(num int) ([]*kubecontainer.Pod, []*kubecontainer.PodStatus, []*PodLifecycleEvent) {
|
||||
var pods []*kubecontainer.Pod
|
||||
var statuses []*kubecontainer.PodStatus
|
||||
var events []*PodLifecycleEvent
|
||||
for i := 0; i < num; i++ {
|
||||
id := types.UID(fmt.Sprintf("test-pod-%d", i))
|
||||
cState := kubecontainer.ContainerStateRunning
|
||||
container := createTestContainer(fmt.Sprintf("c%d", i), cState)
|
||||
pod := &kubecontainer.Pod{
|
||||
ID: id,
|
||||
Containers: []*kubecontainer.Container{container},
|
||||
}
|
||||
status := &kubecontainer.PodStatus{
|
||||
ID: id,
|
||||
ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: cState}},
|
||||
}
|
||||
event := &PodLifecycleEvent{ID: pod.ID, Type: ContainerStarted, Data: container.ID.ID}
|
||||
pods = append(pods, pod)
|
||||
statuses = append(statuses, status)
|
||||
events = append(events, event)
|
||||
|
||||
}
|
||||
return pods, statuses, events
|
||||
}
|
||||
|
||||
func TestRelistWithCache(t *testing.T) {
|
||||
pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock()
|
||||
ch := pleg.Watch()
|
||||
|
||||
pods, statuses, events := createTestPodsStatusesAndEvents(2)
|
||||
runtimeMock.On("GetPods", true).Return(pods, nil)
|
||||
runtimeMock.On("GetPodStatus", pods[0].ID, "", "").Return(statuses[0], nil).Once()
|
||||
// Inject an error when querying runtime for the pod status for pods[1].
|
||||
statusErr := fmt.Errorf("unable to get status")
|
||||
runtimeMock.On("GetPodStatus", pods[1].ID, "", "").Return(&kubecontainer.PodStatus{}, statusErr).Once()
|
||||
|
||||
pleg.relist()
|
||||
actualEvents := getEventsFromChannel(ch)
|
||||
cases := []struct {
|
||||
pod *kubecontainer.Pod
|
||||
status *kubecontainer.PodStatus
|
||||
error error
|
||||
}{
|
||||
{pod: pods[0], status: statuses[0], error: nil},
|
||||
{pod: pods[1], status: &kubecontainer.PodStatus{}, error: statusErr},
|
||||
}
|
||||
for i, c := range cases {
|
||||
testStr := fmt.Sprintf("test[%d]", i)
|
||||
actualStatus, actualErr := pleg.cache.Get(c.pod.ID)
|
||||
assert.Equal(t, c.status, actualStatus, testStr)
|
||||
assert.Equal(t, c.error, actualErr, testStr)
|
||||
}
|
||||
// pleg should not generate any event for pods[1] because of the error.
|
||||
assert.Exactly(t, []*PodLifecycleEvent{events[0]}, actualEvents)
|
||||
|
||||
// Return normal status for pods[1].
|
||||
runtimeMock.On("GetPodStatus", pods[1].ID, "", "").Return(statuses[1], nil).Once()
|
||||
pleg.relist()
|
||||
actualEvents = getEventsFromChannel(ch)
|
||||
cases = []struct {
|
||||
pod *kubecontainer.Pod
|
||||
status *kubecontainer.PodStatus
|
||||
error error
|
||||
}{
|
||||
{pod: pods[0], status: statuses[0], error: nil},
|
||||
{pod: pods[1], status: statuses[1], error: nil},
|
||||
}
|
||||
for i, c := range cases {
|
||||
testStr := fmt.Sprintf("test[%d]", i)
|
||||
actualStatus, actualErr := pleg.cache.Get(c.pod.ID)
|
||||
assert.Equal(t, c.status, actualStatus, testStr)
|
||||
assert.Equal(t, c.error, actualErr, testStr)
|
||||
}
|
||||
// Now that we are able to query status for pods[1], pleg should generate an event.
|
||||
assert.Exactly(t, []*PodLifecycleEvent{events[1]}, actualEvents)
|
||||
}
|
||||
|
||||
func TestRemoveCacheEntry(t *testing.T) {
|
||||
pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock()
|
||||
pods, statuses, _ := createTestPodsStatusesAndEvents(1)
|
||||
runtimeMock.On("GetPods", true).Return(pods, nil).Once()
|
||||
runtimeMock.On("GetPodStatus", pods[0].ID, "", "").Return(statuses[0], nil).Once()
|
||||
// Does a relist to populate the cache.
|
||||
pleg.relist()
|
||||
// Delete the pod from runtime. Verify that the cache entry has been
|
||||
// removed after relisting.
|
||||
runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{}, nil).Once()
|
||||
pleg.relist()
|
||||
actualStatus, actualErr := pleg.cache.Get(pods[0].ID)
|
||||
assert.Equal(t, &kubecontainer.PodStatus{ID: pods[0].ID}, actualStatus)
|
||||
assert.Equal(t, nil, actualErr)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user