Merge pull request #2143 from lavalamp/eventing2.1

Begin putting event emission into kubelet
This commit is contained in:
Dawn Chen 2014-11-05 15:37:26 -08:00
commit e88e490614
6 changed files with 155 additions and 30 deletions

View File

@ -36,6 +36,10 @@ func GetReference(obj runtime.Object) (*ObjectReference, error) {
if obj == nil {
return nil, ErrNilObject
}
if ref, ok := obj.(*ObjectReference); ok {
// Don't make a reference to a reference.
return ref, nil
}
meta, err := meta.Accessor(obj)
if err != nil {
return nil, err
@ -57,3 +61,17 @@ func GetReference(obj runtime.Object) (*ObjectReference, error) {
ResourceVersion: meta.ResourceVersion(),
}, nil
}
// GetPartialReference is exactly like GetReference, but allows you to set the FieldPath.
func GetPartialReference(obj runtime.Object, fieldPath string) (*ObjectReference, error) {
ref, err := GetReference(obj)
if err != nil {
return nil, err
}
ref.FieldPath = fieldPath
return ref, nil
}
// Allow clients to preemptively get a reference to an API object and pass it to places that
// intend only to get a reference to that object. This simplifies the event recording interface.
func (*ObjectReference) IsAnAPIObject() {}

View File

@ -31,6 +31,7 @@ func TestGetReference(t *testing.T) {
table := map[string]struct {
obj runtime.Object
ref *ObjectReference
fieldPath string
shouldErr bool
}{
"pod": {
@ -42,12 +43,14 @@ func TestGetReference(t *testing.T) {
SelfLink: "/api/v1beta1/pods/foo",
},
},
fieldPath: ".desiredState.containers[0]",
ref: &ObjectReference{
Kind: "Pod",
APIVersion: "v1beta1",
Name: "foo",
UID: "bar",
ResourceVersion: "42",
FieldPath: ".desiredState.containers[0]",
},
},
"serviceList": {
@ -85,7 +88,7 @@ func TestGetReference(t *testing.T) {
}
for name, item := range table {
ref, err := GetReference(item.obj)
ref, err := GetPartialReference(item.obj, item.fieldPath)
if e, a := item.shouldErr, (err != nil); e != a {
t.Errorf("%v: expected %v, got %v", name, e, a)
continue

View File

@ -91,7 +91,8 @@ const queueLen = 1000
var events = watch.NewMux(queueLen)
// Event constructs an event from the given information and puts it in the queue for sending.
// 'object' is the object this event is about; 'fieldPath', if not "", locates a part of 'object'.
// 'object' is the object this event is about. Event will make a reference-- or you may also
// pass a reference to the object directly.
// 'status' is the new status of the object. 'reason' is the reason it now has this status.
// Both 'status' and 'reason' should be short and unique; they will be used to automate
// handling of events, so imagine people writing switch statements to handle them. You want to
@ -99,13 +100,12 @@ var events = watch.NewMux(queueLen)
// 'message' is intended to be human readable.
//
// The resulting event will be created in the same namespace as the reference object.
func Event(object runtime.Object, fieldPath, status, reason, message string) {
func Event(object runtime.Object, status, reason, message string) {
ref, err := api.GetReference(object)
if err != nil {
glog.Errorf("Could not construct reference to: %#v due to: %v", object, err)
glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, status, reason, message)
return
}
ref.FieldPath = fieldPath
t := util.Now()
e := &api.Event{
@ -124,6 +124,6 @@ func Event(object runtime.Object, fieldPath, status, reason, message string) {
}
// Eventf is just like Event, but with Sprintf for the message field.
func Eventf(object runtime.Object, fieldPath, status, reason, messageFmt string, args ...interface{}) {
Event(object, fieldPath, status, reason, fmt.Sprintf(messageFmt, args...))
func Eventf(object runtime.Object, status, reason, messageFmt string, args ...interface{}) {
Event(object, status, reason, fmt.Sprintf(messageFmt, args...))
}

View File

@ -45,24 +45,28 @@ func (t *testEventRecorder) clearOnEvent() {
}
func TestEventf(t *testing.T) {
testPod := &api.Pod{
ObjectMeta: api.ObjectMeta{
SelfLink: "/api/v1beta1/pods/foo",
Name: "foo",
Namespace: "baz",
UID: "bar",
},
}
testRef, err := api.GetPartialReference(testPod, "desiredState.manifest.containers[2]")
if err != nil {
t.Fatal(err)
}
table := []struct {
obj runtime.Object
fieldPath, status, reason string
messageFmt string
elements []interface{}
expect *api.Event
expectLog string
obj runtime.Object
status, reason string
messageFmt string
elements []interface{}
expect *api.Event
expectLog string
}{
{
obj: &api.Pod{
ObjectMeta: api.ObjectMeta{
SelfLink: "/api/v1beta1/pods/foo",
Name: "foo",
Namespace: "baz",
UID: "bar",
},
},
fieldPath: "desiredState.manifest.containers[2]",
obj: testRef,
status: "running",
reason: "started",
messageFmt: "some verbose message: %v",
@ -87,6 +91,31 @@ func TestEventf(t *testing.T) {
},
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1beta1", ResourceVersion:"", FieldPath:"desiredState.manifest.containers[2]"}): status: 'running', reason: 'started' some verbose message: 1`,
},
{
obj: testPod,
status: "running",
reason: "started",
messageFmt: "some verbose message: %v",
elements: []interface{}{1},
expect: &api.Event{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: "baz",
},
InvolvedObject: api.ObjectReference{
Kind: "Pod",
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "v1beta1",
},
Status: "running",
Reason: "started",
Message: "some verbose message: 1",
Source: "eventTest",
},
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1beta1", ResourceVersion:"", FieldPath:""}): status: 'running', reason: 'started' some verbose message: 1`,
},
}
for _, item := range table {
@ -120,7 +149,7 @@ func TestEventf(t *testing.T) {
called <- struct{}{}
})
record.Eventf(item.obj, item.fieldPath, item.status, item.reason, item.messageFmt, item.elements...)
record.Eventf(item.obj, item.status, item.reason, item.messageFmt, item.elements...)
<-called
<-called

View File

@ -30,6 +30,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
@ -73,6 +74,7 @@ func NewMainKubelet(
resyncInterval: ri,
networkContainerImage: ni,
podWorkers: newPodWorkers(),
dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{},
runner: dockertools.NewDockerContainerCommandRunner(dc),
httpClient: &http.Client{},
pullQPS: pullQPS,
@ -93,6 +95,7 @@ func NewIntegrationTestKubelet(hn string, rd string, dc dockertools.DockerInterf
networkContainerImage: NetworkContainerImage,
resyncInterval: 3 * time.Second,
podWorkers: newPodWorkers(),
dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{},
}
}
@ -110,6 +113,11 @@ type Kubelet struct {
resyncInterval time.Duration
pods []api.BoundPod
// Needed to report events for containers belonging to deleted/modified pods.
// Tracks references for reporting events
dockerIDToRef map[dockertools.DockerID]*api.ObjectReference
refLock sync.RWMutex
// Optional, no events will be sent without it
etcdClient tools.EtcdClient
// Optional, defaults to simple implementaiton
@ -381,6 +389,57 @@ func (kl *Kubelet) runHandler(podFullName, uuid string, container *api.Container
return actionHandler.Run(podFullName, uuid, container, handler)
}
// fieldPath returns a fieldPath locating container within pod.
// Returns an error if the container isn't part of the pod.
func fieldPath(pod *api.BoundPod, container *api.Container) (string, error) {
for i := range pod.Spec.Containers {
here := &pod.Spec.Containers[i]
if here == container {
return fmt.Sprintf("spec.containers[%n]", i), nil
}
}
return "", fmt.Errorf("container %#v not found in pod %#v", container, pod)
}
// containerRef returns an *api.ObjectReference which references the given container within the
// given pod. Returns an error if the reference can't be constructed or the container doesn't
// actually belong to the pod.
func containerRef(pod *api.BoundPod, container *api.Container) (*api.ObjectReference, error) {
fieldPath, err := fieldPath(pod, container)
if err != nil {
// TODO: figure out intelligent way to refer to containers that we implicitly
// start (like the network container). This is not a good way, ugh.
fieldPath = "implicitly required container " + container.Name
}
ref, err := api.GetPartialReference(pod, fieldPath)
if err != nil {
return nil, err
}
return ref, nil
}
// setRef stores a reference to a pod's container, associating it with the given docker id.
func (kl *Kubelet) setRef(id dockertools.DockerID, ref *api.ObjectReference) {
kl.refLock.Lock()
defer kl.refLock.Unlock()
kl.dockerIDToRef[id] = ref
}
// clearRef forgets the given docker id and its associated container reference.
func (kl *Kubelet) clearRef(id dockertools.DockerID) {
kl.refLock.Lock()
defer kl.refLock.Unlock()
delete(kl.dockerIDToRef, id)
}
// getRef returns the container reference of the given id, or (nil, false) if none is stored.
func (kl *Kubelet) getRef(id dockertools.DockerID) (ref *api.ObjectReference, ok bool) {
kl.refLock.RLock()
defer kl.refLock.RUnlock()
ref, ok = kl.dockerIDToRef[id]
return ref, ok
}
// Run a single container from a pod. Returns the docker container ID
func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, podVolumes volumeMap, netMode string) (id dockertools.DockerID, err error) {
envVariables := makeEnvironmentVariables(container)
@ -416,7 +475,19 @@ func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, pod
NetworkMode: netMode,
Privileged: privileged,
})
if err == nil && container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
if err != nil {
return "", err
}
if ref, err := containerRef(pod, container); err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
} else {
// Remember this reference so we can report events about this container
kl.setRef(dockertools.DockerID(dockerContainer.ID), ref)
record.Eventf(ref, "", "running", "started", "Started with docker id %v", dockerContainer.ID)
}
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
handlerErr := kl.runHandler(GetPodFullName(pod), pod.UID, container, container.Lifecycle.PostStart)
if handlerErr != nil {
kl.killContainerByID(dockerContainer.ID, "")
@ -438,9 +509,13 @@ func (kl *Kubelet) killContainerByID(ID, name string) error {
return err
}
// TODO(lavalamp): restore event logging:
// podFullName, uuid, containerName, _ := dockertools.ParseDockerName(name)
// kl.LogEvent(&api.Event{})
ref, ok := kl.getRef(dockertools.DockerID(ID))
if !ok {
glog.Warningf("No ref for pod '%v' - '%v'", ID, name)
} else {
// TODO: pass reason down here, and state, or move this call up the stack.
record.Eventf(ref, "", "killing", "Killing %v - %v", ID, name)
}
return err
}

View File

@ -68,7 +68,7 @@ func (s *Scheduler) scheduleOne() {
pod := s.config.NextPod()
dest, err := s.config.Algorithm.Schedule(*pod, s.config.MinionLister)
if err != nil {
record.Eventf(pod, "", string(api.PodPending), "failedScheduling", "Error scheduling: %v", err)
record.Eventf(pod, string(api.PodPending), "failedScheduling", "Error scheduling: %v", err)
s.config.Error(pod, err)
return
}
@ -78,9 +78,9 @@ func (s *Scheduler) scheduleOne() {
Host: dest,
}
if err := s.config.Binder.Bind(b); err != nil {
record.Eventf(pod, "", string(api.PodPending), "failedScheduling", "Binding rejected: %v", err)
record.Eventf(pod, string(api.PodPending), "failedScheduling", "Binding rejected: %v", err)
s.config.Error(pod, err)
return
}
record.Eventf(pod, "", string(api.PodPending), "scheduled", "Successfully assigned %v to %v", pod.Name, dest)
record.Eventf(pod, string(api.PodPending), "scheduled", "Successfully assigned %v to %v", pod.Name, dest)
}