Updating cadvisor deps. This is necessary for handling sys oom events in kubelet.

This commit is contained in:
Vishnu Kannan 2015-04-03 00:22:55 +00:00
parent 76f1232a2e
commit 86d421ec5e
13 changed files with 290 additions and 272 deletions

64
Godeps/Godeps.json generated
View File

@ -212,83 +212,83 @@
}, },
{ {
"ImportPath": "github.com/google/cadvisor/api", "ImportPath": "github.com/google/cadvisor/api",
"Comment": "0.10.1-106-gfd9f7e0", "Comment": "0.11.0",
"Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233"
}, },
{ {
"ImportPath": "github.com/google/cadvisor/container", "ImportPath": "github.com/google/cadvisor/container",
"Comment": "0.10.1-106-gfd9f7e0", "Comment": "0.11.0",
"Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233"
}, },
{ {
"ImportPath": "github.com/google/cadvisor/events", "ImportPath": "github.com/google/cadvisor/events",
"Comment": "0.10.1-106-gfd9f7e0", "Comment": "0.11.0",
"Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233"
}, },
{ {
"ImportPath": "github.com/google/cadvisor/fs", "ImportPath": "github.com/google/cadvisor/fs",
"Comment": "0.10.1-106-gfd9f7e0", "Comment": "0.11.0",
"Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233"
}, },
{ {
"ImportPath": "github.com/google/cadvisor/healthz", "ImportPath": "github.com/google/cadvisor/healthz",
"Comment": "0.10.1-106-gfd9f7e0", "Comment": "0.11.0",
"Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233"
}, },
{ {
"ImportPath": "github.com/google/cadvisor/http", "ImportPath": "github.com/google/cadvisor/http",
"Comment": "0.10.1-106-gfd9f7e0", "Comment": "0.11.0",
"Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233"
}, },
{ {
"ImportPath": "github.com/google/cadvisor/info/v1", "ImportPath": "github.com/google/cadvisor/info/v1",
"Comment": "0.10.1-106-gfd9f7e0", "Comment": "0.11.0",
"Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233"
}, },
{ {
"ImportPath": "github.com/google/cadvisor/info/v2", "ImportPath": "github.com/google/cadvisor/info/v2",
"Comment": "0.10.1-106-gfd9f7e0", "Comment": "0.11.0",
"Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233"
}, },
{ {
"ImportPath": "github.com/google/cadvisor/manager", "ImportPath": "github.com/google/cadvisor/manager",
"Comment": "0.10.1-106-gfd9f7e0", "Comment": "0.11.0",
"Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233"
}, },
{ {
"ImportPath": "github.com/google/cadvisor/metrics", "ImportPath": "github.com/google/cadvisor/metrics",
"Comment": "0.10.1-106-gfd9f7e0", "Comment": "0.11.0",
"Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233"
}, },
{ {
"ImportPath": "github.com/google/cadvisor/pages", "ImportPath": "github.com/google/cadvisor/pages",
"Comment": "0.10.1-106-gfd9f7e0", "Comment": "0.11.0",
"Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233"
}, },
{ {
"ImportPath": "github.com/google/cadvisor/storage", "ImportPath": "github.com/google/cadvisor/storage",
"Comment": "0.10.1-106-gfd9f7e0", "Comment": "0.11.0",
"Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233"
}, },
{ {
"ImportPath": "github.com/google/cadvisor/summary", "ImportPath": "github.com/google/cadvisor/summary",
"Comment": "0.10.1-106-gfd9f7e0", "Comment": "0.11.0",
"Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233"
}, },
{ {
"ImportPath": "github.com/google/cadvisor/utils", "ImportPath": "github.com/google/cadvisor/utils",
"Comment": "0.10.1-106-gfd9f7e0", "Comment": "0.11.0",
"Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233"
}, },
{ {
"ImportPath": "github.com/google/cadvisor/validate", "ImportPath": "github.com/google/cadvisor/validate",
"Comment": "0.10.1-106-gfd9f7e0", "Comment": "0.11.0",
"Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233"
}, },
{ {
"ImportPath": "github.com/google/cadvisor/version", "ImportPath": "github.com/google/cadvisor/version",
"Comment": "0.10.1-106-gfd9f7e0", "Comment": "0.11.0",
"Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233"
}, },
{ {
"ImportPath": "github.com/google/gofuzz", "ImportPath": "github.com/google/gofuzz",

View File

@ -133,7 +133,7 @@ func writeResult(res interface{}, w http.ResponseWriter) error {
} }
func streamResults(results chan *events.Event, w http.ResponseWriter, r *http.Request) error { func streamResults(eventChannel *events.EventChannel, w http.ResponseWriter, r *http.Request, m manager.Manager) error {
cn, ok := w.(http.CloseNotifier) cn, ok := w.(http.CloseNotifier)
if !ok { if !ok {
return errors.New("could not access http.CloseNotifier") return errors.New("could not access http.CloseNotifier")
@ -151,8 +151,10 @@ func streamResults(results chan *events.Event, w http.ResponseWriter, r *http.Re
for { for {
select { select {
case <-cn.CloseNotify(): case <-cn.CloseNotify():
glog.V(3).Infof("Received CloseNotify event")
m.CloseEventChannel(eventChannel.GetWatchId())
return nil return nil
case ev := <-results: case ev := <-eventChannel.GetChannel():
glog.V(3).Infof("Received event from watch channel in api: %v", ev) glog.V(3).Infof("Received event from watch channel in api: %v", ev)
err := enc.Encode(ev) err := enc.Encode(ev)
if err != nil { if err != nil {
@ -178,19 +180,19 @@ func getContainerInfoRequest(body io.ReadCloser) (*info.ContainerInfoRequest, er
// with any twice defined arguments being assigned the first value. // with any twice defined arguments being assigned the first value.
// If the value type for the argument is wrong the field will be assumed to be // If the value type for the argument is wrong the field will be assumed to be
// unassigned // unassigned
// bools: historical, subcontainers, oom_events, creation_events, deletion_events // bools: stream, subcontainers, oom_events, creation_events, deletion_events
// ints: max_events, start_time (unix timestamp), end_time (unix timestamp) // ints: max_events, start_time (unix timestamp), end_time (unix timestamp)
// example r.URL: http://localhost:8080/api/v1.3/events?oom_events=true&historical=true&max_events=10 // example r.URL: http://localhost:8080/api/v1.3/events?oom_events=true&stream=true
func getEventRequest(r *http.Request) (*events.Request, bool, error) { func getEventRequest(r *http.Request) (*events.Request, bool, error) {
query := events.NewRequest() query := events.NewRequest()
getHistoricalEvents := false stream := false
urlMap := r.URL.Query() urlMap := r.URL.Query()
if val, ok := urlMap["historical"]; ok { if val, ok := urlMap["stream"]; ok {
newBool, err := strconv.ParseBool(val[0]) newBool, err := strconv.ParseBool(val[0])
if err == nil { if err == nil {
getHistoricalEvents = newBool stream = newBool
} }
} }
if val, ok := urlMap["subcontainers"]; ok { if val, ok := urlMap["subcontainers"]; ok {
@ -202,19 +204,19 @@ func getEventRequest(r *http.Request) (*events.Request, bool, error) {
if val, ok := urlMap["oom_events"]; ok { if val, ok := urlMap["oom_events"]; ok {
newBool, err := strconv.ParseBool(val[0]) newBool, err := strconv.ParseBool(val[0])
if err == nil { if err == nil {
query.EventType[events.TypeOom] = newBool query.EventType[info.EventOom] = newBool
} }
} }
if val, ok := urlMap["creation_events"]; ok { if val, ok := urlMap["creation_events"]; ok {
newBool, err := strconv.ParseBool(val[0]) newBool, err := strconv.ParseBool(val[0])
if err == nil { if err == nil {
query.EventType[events.TypeContainerCreation] = newBool query.EventType[info.EventContainerCreation] = newBool
} }
} }
if val, ok := urlMap["deletion_events"]; ok { if val, ok := urlMap["deletion_events"]; ok {
newBool, err := strconv.ParseBool(val[0]) newBool, err := strconv.ParseBool(val[0])
if err == nil { if err == nil {
query.EventType[events.TypeContainerDeletion] = newBool query.EventType[info.EventContainerDeletion] = newBool
} }
} }
if val, ok := urlMap["max_events"]; ok { if val, ok := urlMap["max_events"]; ok {
@ -239,7 +241,7 @@ func getEventRequest(r *http.Request) (*events.Request, bool, error) {
glog.V(2).Infof( glog.V(2).Infof(
"%v was returned in api/handler.go:getEventRequest from the url rawQuery %v", "%v was returned in api/handler.go:getEventRequest from the url rawQuery %v",
query, r.URL.RawQuery) query, r.URL.RawQuery)
return query, getHistoricalEvents, nil return query, stream, nil
} }
func getContainerName(request []string) string { func getContainerName(request []string) string {

View File

@ -20,7 +20,6 @@ import (
"strconv" "strconv"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/google/cadvisor/events"
info "github.com/google/cadvisor/info/v1" info "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/info/v2" "github.com/google/cadvisor/info/v2"
"github.com/google/cadvisor/manager" "github.com/google/cadvisor/manager"
@ -58,7 +57,7 @@ func getApiVersions() []ApiVersion {
v1_1 := newVersion1_1(v1_0) v1_1 := newVersion1_1(v1_0)
v1_2 := newVersion1_2(v1_1) v1_2 := newVersion1_2(v1_1)
v1_3 := newVersion1_3(v1_2) v1_3 := newVersion1_3(v1_2)
v2_0 := newVersion2_0(v1_3) v2_0 := newVersion2_0()
return []ApiVersion{v1_0, v1_1, v1_2, v1_3, v2_0} return []ApiVersion{v1_0, v1_1, v1_2, v1_3, v2_0}
@ -262,40 +261,40 @@ func (self *version1_3) SupportedRequestTypes() []string {
func (self *version1_3) HandleRequest(requestType string, request []string, m manager.Manager, w http.ResponseWriter, r *http.Request) error { func (self *version1_3) HandleRequest(requestType string, request []string, m manager.Manager, w http.ResponseWriter, r *http.Request) error {
switch requestType { switch requestType {
case eventsApi: case eventsApi:
query, eventsFromAllTime, err := getEventRequest(r) return handleEventRequest(m, w, r)
if err != nil {
return err
}
glog.V(2).Infof("Api - Events(%v)", query)
if eventsFromAllTime {
pastEvents, err := m.GetPastEvents(query)
if err != nil {
return err
}
return writeResult(pastEvents, w)
}
eventsChannel := make(chan *events.Event, 10)
err = m.WatchForEvents(query, eventsChannel)
if err != nil {
return err
}
return streamResults(eventsChannel, w, r)
default: default:
return self.baseVersion.HandleRequest(requestType, request, m, w, r) return self.baseVersion.HandleRequest(requestType, request, m, w, r)
} }
} }
// API v2.0 func handleEventRequest(m manager.Manager, w http.ResponseWriter, r *http.Request) error {
query, stream, err := getEventRequest(r)
if err != nil {
return err
}
glog.V(2).Infof("Api - Events(%v)", query)
if !stream {
pastEvents, err := m.GetPastEvents(query)
if err != nil {
return err
}
return writeResult(pastEvents, w)
}
eventChannel, err := m.WatchForEvents(query)
if err != nil {
return err
}
return streamResults(eventChannel, w, r, m)
// v2.0 builds on v1.3
type version2_0 struct {
baseVersion *version1_3
} }
func newVersion2_0(v *version1_3) *version2_0 { // API v2.0
return &version2_0{
baseVersion: v, type version2_0 struct {
} }
func newVersion2_0() *version2_0 {
return &version2_0{}
} }
func (self *version2_0) Version() string { func (self *version2_0) Version() string {
@ -303,7 +302,7 @@ func (self *version2_0) Version() string {
} }
func (self *version2_0) SupportedRequestTypes() []string { func (self *version2_0) SupportedRequestTypes() []string {
return append(self.baseVersion.SupportedRequestTypes(), summaryApi) return []string{versionApi, attributesApi, eventsApi, machineApi, summaryApi, statsApi, specApi, storageApi}
} }
func (self *version2_0) HandleRequest(requestType string, request []string, m manager.Manager, w http.ResponseWriter, r *http.Request) error { func (self *version2_0) HandleRequest(requestType string, request []string, m manager.Manager, w http.ResponseWriter, r *http.Request) error {
@ -388,8 +387,10 @@ func (self *version2_0) HandleRequest(requestType string, request []string, m ma
} }
} }
return writeResult(fi, w) return writeResult(fi, w)
case eventsApi:
return handleEventRequest(m, w, r)
default: default:
return self.baseVersion.HandleRequest(requestType, request, m, w, r) return fmt.Errorf("unknown request type %q", requestType)
} }
} }

View File

@ -21,6 +21,7 @@ import (
"testing" "testing"
"github.com/google/cadvisor/events" "github.com/google/cadvisor/events"
info "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -33,20 +34,19 @@ func makeHTTPRequest(requestURL string, t *testing.T) *http.Request {
} }
func TestGetEventRequestBasicRequest(t *testing.T) { func TestGetEventRequestBasicRequest(t *testing.T) {
r := makeHTTPRequest("http://localhost:8080/api/v1.3/events?oom_events=true&historical=true&max_events=10", t) r := makeHTTPRequest("http://localhost:8080/api/v1.3/events?oom_events=true&stream=false&max_events=20", t)
expectedQuery := &events.Request{ expectedQuery := events.NewRequest()
EventType: map[events.EventType]bool{ expectedQuery.EventType = map[info.EventType]bool{
events.TypeOom: true, info.EventOom: true,
},
MaxEventsReturned: 10,
} }
expectedQuery.MaxEventsReturned = 20
receivedQuery, getHistoricalEvents, err := getEventRequest(r) receivedQuery, stream, err := getEventRequest(r)
if !reflect.DeepEqual(expectedQuery, receivedQuery) { if !reflect.DeepEqual(expectedQuery, receivedQuery) {
t.Errorf("expected %v but received %v", expectedQuery, receivedQuery) t.Errorf("expected %#v but received %#v", expectedQuery, receivedQuery)
} }
assert.True(t, getHistoricalEvents) assert.False(t, stream)
assert.Nil(t, err) assert.Nil(t, err)
} }
@ -54,28 +54,27 @@ func TestGetEventEmptyRequest(t *testing.T) {
r := makeHTTPRequest("", t) r := makeHTTPRequest("", t)
expectedQuery := events.NewRequest() expectedQuery := events.NewRequest()
receivedQuery, getHistoricalEvents, err := getEventRequest(r) receivedQuery, stream, err := getEventRequest(r)
if !reflect.DeepEqual(expectedQuery, receivedQuery) { if !reflect.DeepEqual(expectedQuery, receivedQuery) {
t.Errorf("expected %v but received %v", expectedQuery, receivedQuery) t.Errorf("expected %#v but received %#v", expectedQuery, receivedQuery)
} }
assert.False(t, getHistoricalEvents) assert.False(t, stream)
assert.Nil(t, err) assert.Nil(t, err)
} }
func TestGetEventRequestDoubleArgument(t *testing.T) { func TestGetEventRequestDoubleArgument(t *testing.T) {
r := makeHTTPRequest("http://localhost:8080/api/v1.3/events?historical=true&oom_events=true&oom_events=false", t) r := makeHTTPRequest("http://localhost:8080/api/v1.3/events?stream=true&oom_events=true&oom_events=false", t)
expectedQuery := &events.Request{ expectedQuery := events.NewRequest()
EventType: map[events.EventType]bool{ expectedQuery.EventType = map[info.EventType]bool{
events.TypeOom: true, info.EventOom: true,
},
} }
receivedQuery, getHistoricalEvents, err := getEventRequest(r) receivedQuery, stream, err := getEventRequest(r)
if !reflect.DeepEqual(expectedQuery, receivedQuery) { if !reflect.DeepEqual(expectedQuery, receivedQuery) {
t.Errorf("expected %v but received %v", expectedQuery, receivedQuery) t.Errorf("expected %#v but received %#v", expectedQuery, receivedQuery)
} }
assert.True(t, getHistoricalEvents) assert.True(t, stream)
assert.Nil(t, err) assert.Nil(t, err)
} }

View File

@ -20,6 +20,9 @@ import (
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/golang/glog"
info "github.com/google/cadvisor/info/v1"
) )
// EventManager is implemented by Events. It provides two ways to monitor // EventManager is implemented by Events. It provides two ways to monitor
@ -27,13 +30,15 @@ import (
type EventManager interface { type EventManager interface {
// Watch checks if events fed to it by the caller of AddEvent satisfy the // Watch checks if events fed to it by the caller of AddEvent satisfy the
// request and if so sends the event back to the caller on outChannel // request and if so sends the event back to the caller on outChannel
WatchEvents(outChannel chan *Event, request *Request) error WatchEvents(request *Request) (*EventChannel, error)
// GetEvents() returns a slice of all events detected that have passed // GetEvents() returns a slice of all events detected that have passed
// the *Request object parameters to the caller // the *Request object parameters to the caller
GetEvents(request *Request) (EventSlice, error) GetEvents(request *Request) (EventSlice, error)
// AddEvent allows the caller to add an event to an EventManager // AddEvent allows the caller to add an event to an EventManager
// object // object
AddEvent(e *Event) error AddEvent(e *info.Event) error
// Removes a watch instance from the EventManager's watchers map
StopWatch(watch_id int)
} }
// Events holds a slice of *Event objects with a potential field // Events holds a slice of *Event objects with a potential field
@ -47,11 +52,14 @@ type events struct {
// linked to different calls of WatchEvents. When new events are found that // linked to different calls of WatchEvents. When new events are found that
// satisfy the request of a given watch object in watchers, the event // satisfy the request of a given watch object in watchers, the event
// is sent over the channel to that caller of WatchEvents // is sent over the channel to that caller of WatchEvents
watchers []*watch watchers map[int]*watch
// lock that blocks eventlist from being accessed until a writer releases it // lock that blocks eventlist from being accessed until a writer releases it
eventsLock sync.RWMutex eventsLock sync.RWMutex
// lock that blocks watchers from being accessed until a writer releases it // lock that blocks watchers from being accessed until a writer releases it
watcherLock sync.RWMutex watcherLock sync.RWMutex
// receives notices when a watch event ends and needs to be removed from
// the watchers list
lastId int
} }
// initialized by a call to WatchEvents(), a watch struct will then be added // initialized by a call to WatchEvents(), a watch struct will then be added
@ -66,26 +74,14 @@ type watch struct {
request *Request request *Request
// a channel created by the caller through which events satisfying the // a channel created by the caller through which events satisfying the
// request are sent to the caller // request are sent to the caller
channel chan *Event eventChannel *EventChannel
// unique identifier of a watch that is used as a key in events' watchers
// map
id int
} }
// typedef of a slice of Event pointers // typedef of a slice of Event pointers
type EventSlice []*Event type EventSlice []*info.Event
// Event contains information general to events such as the time at which they
// occurred, their specific type, and the actual event. Event types are
// differentiated by the EventType field of Event.
type Event struct {
// the absolute container name for which the event occurred
ContainerName string
// the time at which the event occurred
Timestamp time.Time
// the type of event. EventType is an enumerated type
EventType EventType
// the original event object and all of its extraneous data, ex. an
// OomInstance
EventData EventDataInterface
}
// Request holds a set of parameters by which Event objects may be screened. // Request holds a set of parameters by which Event objects may be screened.
// The caller may want events that occurred within a specific timeframe // The caller may want events that occurred within a specific timeframe
@ -99,7 +95,7 @@ type Request struct {
// must be left blank in calls to WatchEvents // must be left blank in calls to WatchEvents
EndTime time.Time EndTime time.Time
// EventType is a map that specifies the type(s) of events wanted // EventType is a map that specifies the type(s) of events wanted
EventType map[EventType]bool EventType map[info.EventType]bool
// allows the caller to put a limit on how many // allows the caller to put a limit on how many
// events they receive. If there are more events than MaxEventsReturned // events they receive. If there are more events than MaxEventsReturned
// then the most chronologically recent events in the time period // then the most chronologically recent events in the time period
@ -112,45 +108,51 @@ type Request struct {
IncludeSubcontainers bool IncludeSubcontainers bool
} }
// EventType is an enumerated type which lists the categories under which type EventChannel struct {
// events may fall. The Event field EventType is populated by this enum. watchId int
type EventType int channel chan *info.Event
}
const ( func NewEventChannel(watchId int) *EventChannel {
TypeOom EventType = iota return &EventChannel{
TypeContainerCreation watchId: watchId,
TypeContainerDeletion channel: make(chan *info.Event, 10),
) }
// a general interface which populates the Event field EventData. The actual
// object, such as an OomInstance, is set as an Event's EventData
type EventDataInterface interface {
} }
// returns a pointer to an initialized Events object // returns a pointer to an initialized Events object
func NewEventManager() *events { func NewEventManager() *events {
return &events{ return &events{
eventlist: make(EventSlice, 0), eventlist: make(EventSlice, 0),
watchers: []*watch{}, watchers: make(map[int]*watch),
} }
} }
// returns a pointer to an initialized Request object // returns a pointer to an initialized Request object
func NewRequest() *Request { func NewRequest() *Request {
return &Request{ return &Request{
EventType: map[EventType]bool{}, EventType: map[info.EventType]bool{},
IncludeSubcontainers: false, IncludeSubcontainers: false,
MaxEventsReturned: 10,
} }
} }
// returns a pointer to an initialized watch object // returns a pointer to an initialized watch object
func newWatch(request *Request, outChannel chan *Event) *watch { func newWatch(request *Request, eventChannel *EventChannel) *watch {
return &watch{ return &watch{
request: request, request: request,
channel: outChannel, eventChannel: eventChannel,
} }
} }
func (self *EventChannel) GetChannel() chan *info.Event {
return self.channel
}
func (self *EventChannel) GetWatchId() int {
return self.watchId
}
// function necessary to implement the sort interface on the Events struct // function necessary to implement the sort interface on the Events struct
func (e EventSlice) Len() int { func (e EventSlice) Len() int {
return len(e) return len(e)
@ -180,7 +182,7 @@ func getMaxEventsReturned(request *Request, eSlice EventSlice) EventSlice {
// container path is a prefix of the event container path. Otherwise, // container path is a prefix of the event container path. Otherwise,
// it checks that the container paths of the event and request are // it checks that the container paths of the event and request are
// equivalent // equivalent
func checkIfIsSubcontainer(request *Request, event *Event) bool { func checkIfIsSubcontainer(request *Request, event *info.Event) bool {
if request.IncludeSubcontainers == true { if request.IncludeSubcontainers == true {
return strings.HasPrefix(event.ContainerName+"/", request.ContainerName+"/") return strings.HasPrefix(event.ContainerName+"/", request.ContainerName+"/")
} }
@ -188,7 +190,7 @@ func checkIfIsSubcontainer(request *Request, event *Event) bool {
} }
// determines if an event occurs within the time set in the request object and is the right type // determines if an event occurs within the time set in the request object and is the right type
func checkIfEventSatisfiesRequest(request *Request, event *Event) bool { func checkIfEventSatisfiesRequest(request *Request, event *info.Event) bool {
startTime := request.StartTime startTime := request.StartTime
endTime := request.EndTime endTime := request.EndTime
eventTime := event.Timestamp eventTime := event.Timestamp
@ -234,29 +236,30 @@ func (self *events) GetEvents(request *Request) (EventSlice, error) {
// Request object it is fed to the channel. The StartTime and EndTime of the watch // Request object it is fed to the channel. The StartTime and EndTime of the watch
// request should be uninitialized because the purpose is to watch indefinitely // request should be uninitialized because the purpose is to watch indefinitely
// for events that will happen in the future // for events that will happen in the future
func (self *events) WatchEvents(outChannel chan *Event, request *Request) error { func (self *events) WatchEvents(request *Request) (*EventChannel, error) {
if !request.StartTime.IsZero() || !request.EndTime.IsZero() { if !request.StartTime.IsZero() || !request.EndTime.IsZero() {
return errors.New( return nil, errors.New(
"for a call to watch, request.StartTime and request.EndTime must be uninitialized") "for a call to watch, request.StartTime and request.EndTime must be uninitialized")
} }
newWatcher := newWatch(request, outChannel)
self.watcherLock.Lock() self.watcherLock.Lock()
defer self.watcherLock.Unlock() defer self.watcherLock.Unlock()
self.watchers = append(self.watchers, newWatcher) new_id := self.lastId + 1
return nil returnEventChannel := NewEventChannel(new_id)
newWatcher := newWatch(request, returnEventChannel)
self.watchers[new_id] = newWatcher
self.lastId = new_id
return returnEventChannel, nil
} }
// helper function to update the event manager's eventlist // helper function to update the event manager's eventlist
func (self *events) updateEventList(e *Event) { func (self *events) updateEventList(e *info.Event) {
self.eventsLock.Lock() self.eventsLock.Lock()
defer self.eventsLock.Unlock() defer self.eventsLock.Unlock()
self.eventlist = append(self.eventlist, e) self.eventlist = append(self.eventlist, e)
} }
func (self *events) findValidWatchers(e *Event) []*watch { func (self *events) findValidWatchers(e *info.Event) []*watch {
watchesToSend := make([]*watch, 0) watchesToSend := make([]*watch, 0)
self.watcherLock.RLock()
defer self.watcherLock.RUnlock()
for _, watcher := range self.watchers { for _, watcher := range self.watchers {
watchRequest := watcher.request watchRequest := watcher.request
if checkIfEventSatisfiesRequest(watchRequest, e) { if checkIfEventSatisfiesRequest(watchRequest, e) {
@ -269,11 +272,25 @@ func (self *events) findValidWatchers(e *Event) []*watch {
// method of Events object that adds the argument Event object to the // method of Events object that adds the argument Event object to the
// eventlist. It also feeds the event to a set of watch channels // eventlist. It also feeds the event to a set of watch channels
// held by the manager if it satisfies the request keys of the channels // held by the manager if it satisfies the request keys of the channels
func (self *events) AddEvent(e *Event) error { func (self *events) AddEvent(e *info.Event) error {
self.updateEventList(e) self.updateEventList(e)
self.watcherLock.RLock()
defer self.watcherLock.RUnlock()
watchesToSend := self.findValidWatchers(e) watchesToSend := self.findValidWatchers(e)
for _, watchObject := range watchesToSend { for _, watchObject := range watchesToSend {
watchObject.channel <- e watchObject.eventChannel.GetChannel() <- e
} }
return nil return nil
} }
// Removes a watch instance from the EventManager's watchers map
func (self *events) StopWatch(watchId int) {
self.watcherLock.Lock()
defer self.watcherLock.Unlock()
_, ok := self.watchers[watchId]
if !ok {
glog.Errorf("Could not find watcher instance %v", watchId)
}
close(self.watchers[watchId].eventChannel.GetChannel())
delete(self.watchers, watchId)
}

View File

@ -17,6 +17,9 @@ package events
import ( import (
"testing" "testing"
"time" "time"
info "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/assert"
) )
func createOldTime(t *testing.T) time.Time { func createOldTime(t *testing.T) time.Time {
@ -31,16 +34,16 @@ func createOldTime(t *testing.T) time.Time {
} }
// used to convert an OomInstance to an Event object // used to convert an OomInstance to an Event object
func makeEvent(inTime time.Time, containerName string) *Event { func makeEvent(inTime time.Time, containerName string) *info.Event {
return &Event{ return &info.Event{
ContainerName: containerName, ContainerName: containerName,
Timestamp: inTime, Timestamp: inTime,
EventType: TypeOom, EventType: info.EventOom,
} }
} }
// returns EventManager and Request to use in tests // returns EventManager and Request to use in tests
func initializeScenario(t *testing.T) (*events, *Request, *Event, *Event) { func initializeScenario(t *testing.T) (*events, *Request, *info.Event, *info.Event) {
fakeEvent := makeEvent(createOldTime(t), "/") fakeEvent := makeEvent(createOldTime(t), "/")
fakeEvent2 := makeEvent(time.Now(), "/") fakeEvent2 := makeEvent(time.Now(), "/")
@ -54,7 +57,7 @@ func checkNumberOfEvents(t *testing.T, numEventsExpected int, numEventsReceived
} }
} }
func ensureProperEventReturned(t *testing.T, expectedEvent *Event, eventObjectFound *Event) { func ensureProperEventReturned(t *testing.T, expectedEvent *info.Event, eventObjectFound *info.Event) {
if eventObjectFound != expectedEvent { if eventObjectFound != expectedEvent {
t.Errorf("Expected to find test object %v but found a different object: %v", t.Errorf("Expected to find test object %v but found a different object: %v",
expectedEvent, eventObjectFound) expectedEvent, eventObjectFound)
@ -65,13 +68,13 @@ func TestCheckIfIsSubcontainer(t *testing.T) {
myRequest := NewRequest() myRequest := NewRequest()
myRequest.ContainerName = "/root" myRequest.ContainerName = "/root"
sameContainerEvent := &Event{ sameContainerEvent := &info.Event{
ContainerName: "/root", ContainerName: "/root",
} }
subContainerEvent := &Event{ subContainerEvent := &info.Event{
ContainerName: "/root/subdir", ContainerName: "/root/subdir",
} }
differentContainerEvent := &Event{ differentContainerEvent := &info.Event{
ContainerName: "/root-completely-different-container", ContainerName: "/root-completely-different-container",
} }
@ -102,9 +105,9 @@ func TestCheckIfIsSubcontainer(t *testing.T) {
func TestWatchEventsDetectsNewEvents(t *testing.T) { func TestWatchEventsDetectsNewEvents(t *testing.T) {
myEventHolder, myRequest, fakeEvent, fakeEvent2 := initializeScenario(t) myEventHolder, myRequest, fakeEvent, fakeEvent2 := initializeScenario(t)
myRequest.EventType[TypeOom] = true myRequest.EventType[info.EventOom] = true
outChannel := make(chan *Event, 10) returnEventChannel, err := myEventHolder.WatchEvents(myRequest)
myEventHolder.WatchEvents(outChannel, myRequest) assert.Nil(t, err)
myEventHolder.AddEvent(fakeEvent) myEventHolder.AddEvent(fakeEvent)
myEventHolder.AddEvent(fakeEvent2) myEventHolder.AddEvent(fakeEvent2)
@ -114,19 +117,17 @@ func TestWatchEventsDetectsNewEvents(t *testing.T) {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
if time.Since(startTime) > (5 * time.Second) { if time.Since(startTime) > (5 * time.Second) {
t.Errorf("Took too long to receive all the events") t.Errorf("Took too long to receive all the events")
close(outChannel)
} }
}() }()
eventsFound := 0 eventsFound := 0
go func() { go func() {
for event := range outChannel { for event := range returnEventChannel.GetChannel() {
eventsFound += 1 eventsFound += 1
if eventsFound == 1 { if eventsFound == 1 {
ensureProperEventReturned(t, fakeEvent, event) ensureProperEventReturned(t, fakeEvent, event)
} else if eventsFound == 2 { } else if eventsFound == 2 {
ensureProperEventReturned(t, fakeEvent2, event) ensureProperEventReturned(t, fakeEvent2, event)
close(outChannel)
break break
} }
} }
@ -145,15 +146,13 @@ func TestAddEventAddsEventsToEventManager(t *testing.T) {
func TestGetEventsForOneEvent(t *testing.T) { func TestGetEventsForOneEvent(t *testing.T) {
myEventHolder, myRequest, fakeEvent, fakeEvent2 := initializeScenario(t) myEventHolder, myRequest, fakeEvent, fakeEvent2 := initializeScenario(t)
myRequest.MaxEventsReturned = 1 myRequest.MaxEventsReturned = 1
myRequest.EventType[TypeOom] = true myRequest.EventType[info.EventOom] = true
myEventHolder.AddEvent(fakeEvent) myEventHolder.AddEvent(fakeEvent)
myEventHolder.AddEvent(fakeEvent2) myEventHolder.AddEvent(fakeEvent2)
receivedEvents, err := myEventHolder.GetEvents(myRequest) receivedEvents, err := myEventHolder.GetEvents(myRequest)
if err != nil { assert.Nil(t, err)
t.Errorf("Failed to GetEvents: %v", err)
}
checkNumberOfEvents(t, 1, receivedEvents.Len()) checkNumberOfEvents(t, 1, receivedEvents.Len())
ensureProperEventReturned(t, fakeEvent2, receivedEvents[0]) ensureProperEventReturned(t, fakeEvent2, receivedEvents[0])
} }
@ -162,15 +161,13 @@ func TestGetEventsForTimePeriod(t *testing.T) {
myEventHolder, myRequest, fakeEvent, fakeEvent2 := initializeScenario(t) myEventHolder, myRequest, fakeEvent, fakeEvent2 := initializeScenario(t)
myRequest.StartTime = createOldTime(t).Add(-1 * time.Second * 10) myRequest.StartTime = createOldTime(t).Add(-1 * time.Second * 10)
myRequest.EndTime = createOldTime(t).Add(time.Second * 10) myRequest.EndTime = createOldTime(t).Add(time.Second * 10)
myRequest.EventType[TypeOom] = true myRequest.EventType[info.EventOom] = true
myEventHolder.AddEvent(fakeEvent) myEventHolder.AddEvent(fakeEvent)
myEventHolder.AddEvent(fakeEvent2) myEventHolder.AddEvent(fakeEvent2)
receivedEvents, err := myEventHolder.GetEvents(myRequest) receivedEvents, err := myEventHolder.GetEvents(myRequest)
if err != nil { assert.Nil(t, err)
t.Errorf("Failed to GetEvents: %v", err)
}
checkNumberOfEvents(t, 1, receivedEvents.Len()) checkNumberOfEvents(t, 1, receivedEvents.Len())
ensureProperEventReturned(t, fakeEvent, receivedEvents[0]) ensureProperEventReturned(t, fakeEvent, receivedEvents[0])
@ -183,8 +180,6 @@ func TestGetEventsForNoTypeRequested(t *testing.T) {
myEventHolder.AddEvent(fakeEvent2) myEventHolder.AddEvent(fakeEvent2)
receivedEvents, err := myEventHolder.GetEvents(myRequest) receivedEvents, err := myEventHolder.GetEvents(myRequest)
if err != nil { assert.Nil(t, err)
t.Errorf("Failed to GetEvents: %v", err)
}
checkNumberOfEvents(t, 0, receivedEvents.Len()) checkNumberOfEvents(t, 0, receivedEvents.Len())
} }

View File

@ -473,3 +473,33 @@ func calculateCpuUsage(prev, cur uint64) uint64 {
} }
return cur - prev return cur - prev
} }
// Event contains information general to events such as the time at which they
// occurred, their specific type, and the actual event. Event types are
// differentiated by the EventType field of Event.
type Event struct {
// the absolute container name for which the event occurred
ContainerName string
// the time at which the event occurred
Timestamp time.Time
// the type of event. EventType is an enumerated type
EventType EventType
// the original event object and all of its extraneous data, ex. an
// OomInstance
EventData EventDataInterface
}
// EventType is an enumerated type which lists the categories under which
// events may fall. The Event field EventType is populated by this enum.
type EventType int
const (
EventOom EventType = iota
EventContainerCreation
EventContainerDeletion
)
// a general interface which populates the Event field EventData. The actual
// object, such as an OomInstance, is set as an Event's EventData
type EventDataInterface interface {
}

View File

@ -83,10 +83,12 @@ type Manager interface {
GetFsInfo(label string) ([]v2.FsInfo, error) GetFsInfo(label string) ([]v2.FsInfo, error)
// Get events streamed through passedChannel that fit the request. // Get events streamed through passedChannel that fit the request.
WatchForEvents(request *events.Request, passedChannel chan *events.Event) error WatchForEvents(request *events.Request) (*events.EventChannel, error)
// Get past events that have been detected and that fit the request. // Get past events that have been detected and that fit the request.
GetPastEvents(request *events.Request) (events.EventSlice, error) GetPastEvents(request *events.Request) ([]*info.Event, error)
CloseEventChannel(watch_id int)
} }
// New takes a memory storage and returns a new manager. // New takes a memory storage and returns a new manager.
@ -669,11 +671,11 @@ func (m *manager) createContainer(containerName string) error {
return err return err
} }
newEvent := &events.Event{ newEvent := &info.Event{
ContainerName: contRef.Name, ContainerName: contRef.Name,
EventData: contSpecs, EventData: contSpecs,
Timestamp: contSpecs.CreationTime, Timestamp: contSpecs.CreationTime,
EventType: events.TypeContainerCreation, EventType: info.EventContainerCreation,
} }
err = m.eventHandler.AddEvent(newEvent) err = m.eventHandler.AddEvent(newEvent)
if err != nil { if err != nil {
@ -721,10 +723,10 @@ func (m *manager) destroyContainer(containerName string) error {
return err return err
} }
newEvent := &events.Event{ newEvent := &info.Event{
ContainerName: contRef.Name, ContainerName: contRef.Name,
Timestamp: time.Now(), Timestamp: time.Now(),
EventType: events.TypeContainerDeletion, EventType: info.EventContainerDeletion,
} }
err = m.eventHandler.AddEvent(newEvent) err = m.eventHandler.AddEvent(newEvent)
if err != nil { if err != nil {
@ -868,22 +870,20 @@ func (self *manager) watchForNewOoms() error {
if err != nil { if err != nil {
return err return err
} }
err = oomLog.StreamOoms(outStream) go oomLog.StreamOoms(outStream)
if err != nil {
return err
}
go func() { go func() {
for oomInstance := range outStream { for oomInstance := range outStream {
newEvent := &events.Event{ newEvent := &info.Event{
ContainerName: oomInstance.ContainerName, ContainerName: oomInstance.ContainerName,
Timestamp: oomInstance.TimeOfDeath, Timestamp: oomInstance.TimeOfDeath,
EventType: events.TypeOom, EventType: info.EventOom,
EventData: oomInstance, EventData: oomInstance,
} }
glog.V(1).Infof("Created an oom event: %v", newEvent) glog.V(1).Infof("Created an oom event: %v", newEvent)
err := self.eventHandler.AddEvent(newEvent) err := self.eventHandler.AddEvent(newEvent)
if err != nil { if err != nil {
glog.Errorf("Failed to add event %v, got error: %v", newEvent, err) glog.Errorf("failed to add event %v, got error: %v", newEvent, err)
} }
} }
}() }()
@ -891,11 +891,16 @@ func (self *manager) watchForNewOoms() error {
} }
// can be called by the api which will take events returned on the channel // can be called by the api which will take events returned on the channel
func (self *manager) WatchForEvents(request *events.Request, passedChannel chan *events.Event) error { func (self *manager) WatchForEvents(request *events.Request) (*events.EventChannel, error) {
return self.eventHandler.WatchEvents(passedChannel, request) return self.eventHandler.WatchEvents(request)
} }
// can be called by the api which will return all events satisfying the request // can be called by the api which will return all events satisfying the request
func (self *manager) GetPastEvents(request *events.Request) (events.EventSlice, error) { func (self *manager) GetPastEvents(request *events.Request) ([]*info.Event, error) {
return self.eventHandler.GetEvents(request) return self.eventHandler.GetEvents(request)
} }
// called by the api when a client is no longer listening to the channel
func (self *manager) CloseEventChannel(watch_id int) {
self.eventHandler.StopWatch(watch_id)
}

View File

@ -70,14 +70,14 @@ func (c *ManagerMock) GetRequestedContainersInfo(containerName string, options v
return args.Get(0).(map[string]*info.ContainerInfo), args.Error(1) return args.Get(0).(map[string]*info.ContainerInfo), args.Error(1)
} }
func (c *ManagerMock) WatchForEvents(queryuest *events.Request, passedChannel chan *events.Event) error { func (c *ManagerMock) WatchForEvents(queryuest *events.Request, passedChannel chan *info.Event) error {
args := c.Called(queryuest, passedChannel) args := c.Called(queryuest, passedChannel)
return args.Error(0) return args.Error(0)
} }
func (c *ManagerMock) GetPastEvents(queryuest *events.Request) (events.EventSlice, error) { func (c *ManagerMock) GetPastEvents(queryuest *events.Request) ([]*info.Event, error) {
args := c.Called(queryuest) args := c.Called(queryuest)
return args.Get(0).(events.EventSlice), args.Error(1) return args.Get(0).([]*info.Event), args.Error(1)
} }
func (c *ManagerMock) GetMachineInfo() (*info.MachineInfo, error) { func (c *ManagerMock) GetMachineInfo() (*info.MachineInfo, error) {

View File

@ -31,10 +31,7 @@ func main() {
if err != nil { if err != nil {
glog.Infof("Couldn't make a new oomparser. %v", err) glog.Infof("Couldn't make a new oomparser. %v", err)
} else { } else {
err := oomLog.StreamOoms(outStream) go oomLog.StreamOoms(outStream)
if err != nil {
glog.Errorf("%v", err)
}
// demonstration of how to get oomLog's list of oomInstances or access // demonstration of how to get oomLog's list of oomInstances or access
// the user-declared oomInstance channel, here called outStream // the user-declared oomInstance channel, here called outStream
for oomInstance := range outStream { for oomInstance := range outStream {

View File

@ -16,9 +16,10 @@ package oomparser
import ( import (
"bufio" "bufio"
"fmt" "errors"
"io" "io"
"os" "os"
"os/exec"
"path" "path"
"regexp" "regexp"
"strconv" "strconv"
@ -37,7 +38,7 @@ var firstLineRegexp *regexp.Regexp = regexp.MustCompile(
// struct to hold file from which we obtain OomInstances // struct to hold file from which we obtain OomInstances
type OomParser struct { type OomParser struct {
systemFile string ioreader *bufio.Reader
} }
// struct that contains information related to an OOM kill instance // struct that contains information related to an OOM kill instance
@ -123,19 +124,18 @@ func readLinesFromFile(lineChannel chan string, ioreader *bufio.Reader) {
} }
} }
// Calls goroutine for analyzeLinesHelper, which feeds it complete lines. // Calls goroutine for readLinesFromFile, which feeds it complete lines.
// Lines are checked against a regexp to check for the pid, process name, etc. // Lines are checked against a regexp to check for the pid, process name, etc.
// At the end of an oom message group, AnalyzeLines adds the new oomInstance to // At the end of an oom message group, StreamOoms adds the new oomInstance to
// oomLog // oomLog
func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomInstance) { func (self *OomParser) StreamOoms(outStream chan *OomInstance) {
lineChannel := make(chan string, 10) lineChannel := make(chan string, 10)
go func() { go func() {
readLinesFromFile(lineChannel, ioreader) readLinesFromFile(lineChannel, self.ioreader)
}() }()
for line := range lineChannel { for line := range lineChannel {
in_oom_kernel_log := checkIfStartOfOomMessages(line) in_oom_kernel_log := checkIfStartOfOomMessages(line)
if in_oom_kernel_log { if in_oom_kernel_log {
oomCurrentInstance := &OomInstance{ oomCurrentInstance := &OomInstance{
ContainerName: "/", ContainerName: "/",
@ -153,12 +153,37 @@ func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomI
line = <-lineChannel line = <-lineChannel
} }
in_oom_kernel_log = false in_oom_kernel_log = false
glog.V(1).Infof("Sending an oomInstance: %v", oomCurrentInstance)
outStream <- oomCurrentInstance outStream <- oomCurrentInstance
} }
} }
glog.Infof("exiting analyzeLines") glog.Infof("exiting analyzeLines")
} }
func callJournalctl() (io.ReadCloser, error) {
cmd := exec.Command("journalctl", "-f")
readcloser, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
if err := cmd.Start(); err != nil {
return nil, err
}
return readcloser, err
}
func trySystemd() (*OomParser, error) {
readcloser, err := callJournalctl()
if err != nil {
return nil, err
}
glog.V(1).Infof("oomparser using systemd")
return &OomParser{
ioreader: bufio.NewReader(readcloser),
}, nil
}
// looks for system files that contain kernel messages and if one is found, sets // looks for system files that contain kernel messages and if one is found, sets
// the systemFile attribute of the OomParser object // the systemFile attribute of the OomParser object
func getSystemFile() (string, error) { func getSystemFile() (string, error) {
@ -169,37 +194,23 @@ func getSystemFile() (string, error) {
} else if utils.FileExists(varLogSyslog) { } else if utils.FileExists(varLogSyslog) {
return varLogSyslog, nil return varLogSyslog, nil
} }
return "", fmt.Errorf("neither %s nor %s exists from which to read kernel errors", varLogMessages, varLogSyslog) return "", errors.New("neither " + varLogSyslog + " nor " + varLogMessages + " exists from which to read kernel errors")
}
// calls a go routine that populates self.OomInstances and fills the argument
// channel with OomInstance objects as they are read from the file.
// opens the OomParser's systemFile which was set in getSystemFile
// to look for OOM messages by calling AnalyzeLines. Takes in the argument
// outStream, which is passed in by the user and passed to AnalyzeLines.
// OomInstance objects are added to outStream when they are found by
// AnalyzeLines
func (self *OomParser) StreamOoms(outStream chan *OomInstance) error {
file, err := os.Open(self.systemFile)
if err != nil {
return err
}
ioreader := bufio.NewReader(file)
// Process the events received from the kernel.
go func() {
self.analyzeLines(ioreader, outStream)
}()
return nil
} }
// initializes an OomParser object and calls getSystemFile to set the systemFile // initializes an OomParser object and calls getSystemFile to set the systemFile
// attribute. Returns and OomParser object and an error // attribute. Returns and OomParser object and an error
func New() (*OomParser, error) { func New() (*OomParser, error) {
systemFileName, err := getSystemFile() systemFile, err := getSystemFile()
if err != nil { if err != nil {
return nil, err glog.V(1).Infof("received error %v when calling getSystemFile", err)
return trySystemd()
}
file, err := os.Open(systemFile)
if err != nil {
glog.V(1).Infof("received error %v when opening file", err)
return trySystemd()
} }
return &OomParser{ return &OomParser{
systemFile: systemFileName}, nil ioreader: bufio.NewReader(file),
}, nil
} }

View File

@ -116,44 +116,6 @@ func TestCheckIfStartOfMessages(t *testing.T) {
} }
} }
func TestAnalyzeLinesContainerOom(t *testing.T) {
expectedContainerOomInstance := createExpectedContainerOomInstance(t)
helpTestAnalyzeLines(expectedContainerOomInstance, containerLogFile, t)
}
func TestAnalyzeLinesSystemOom(t *testing.T) {
expectedSystemOomInstance := createExpectedSystemOomInstance(t)
helpTestAnalyzeLines(expectedSystemOomInstance, systemLogFile, t)
}
func helpTestAnalyzeLines(oomCheckInstance *OomInstance, sysFile string, t *testing.T) {
outStream := make(chan *OomInstance)
oomLog := new(OomParser)
oomLog.systemFile = sysFile
file, err := os.Open(oomLog.systemFile)
if err != nil {
t.Errorf("couldn't open test log: %v", err)
}
ioreader := bufio.NewReader(file)
timeout := make(chan bool, 1)
go func() {
time.Sleep(1 * time.Second)
timeout <- true
}()
go oomLog.analyzeLines(ioreader, outStream)
select {
case oomInstance := <-outStream:
if *oomCheckInstance != *oomInstance {
t.Errorf("wrong instance returned. Expected %v and got %v",
oomCheckInstance, oomInstance)
t.Errorf("Container of one was %v and the other %v", oomCheckInstance.ContainerName, oomInstance.ContainerName)
}
case <-timeout:
t.Error(
"timeout happened before oomInstance was found in test file")
}
}
func TestStreamOomsContainer(t *testing.T) { func TestStreamOomsContainer(t *testing.T) {
expectedContainerOomInstance := createExpectedContainerOomInstance(t) expectedContainerOomInstance := createExpectedContainerOomInstance(t)
helpTestStreamOoms(expectedContainerOomInstance, containerLogFile, t) helpTestStreamOoms(expectedContainerOomInstance, containerLogFile, t)
@ -166,18 +128,14 @@ func TestStreamOomsSystem(t *testing.T) {
func helpTestStreamOoms(oomCheckInstance *OomInstance, sysFile string, t *testing.T) { func helpTestStreamOoms(oomCheckInstance *OomInstance, sysFile string, t *testing.T) {
outStream := make(chan *OomInstance) outStream := make(chan *OomInstance)
oomLog := new(OomParser) oomLog := mockOomParser(sysFile, t)
oomLog.systemFile = sysFile
timeout := make(chan bool, 1) timeout := make(chan bool, 1)
go func() { go func() {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
timeout <- true timeout <- true
}() }()
err := oomLog.StreamOoms(outStream) go oomLog.StreamOoms(outStream)
if err != nil {
t.Errorf("had an error opening file: %v", err)
}
select { select {
case oomInstance := <-outStream: case oomInstance := <-outStream:
@ -191,9 +149,12 @@ func helpTestStreamOoms(oomCheckInstance *OomInstance, sysFile string, t *testin
} }
} }
func TestNew(t *testing.T) { func mockOomParser(sysFile string, t *testing.T) *OomParser {
_, err := New() file, err := os.Open(sysFile)
if err != nil { if err != nil {
t.Errorf("function New() had error %v", err) t.Errorf("had an error opening file: %v", err)
}
return &OomParser{
ioreader: bufio.NewReader(file),
} }
} }

View File

@ -15,4 +15,4 @@
package version package version
// Version of cAdvisor. // Version of cAdvisor.
const VERSION = "0.10.1" const VERSION = "0.11.0"