From 6aca715ad84590cc9112714afe5b96ec3f68e236 Mon Sep 17 00:00:00 2001 From: David Scott Date: Thu, 5 Jul 2018 14:07:06 +0100 Subject: [PATCH] memlogd: replace an unbounded buffer with bounded channel Previously we had a per-connection bytes.Buffer // to be written to the connection sync.Cond // to allow us to Wait for more data This had the major disadvantage that the buffer was unbounded and so a slow client could cause memory exhaustion in the server. This patch replaces these with a single chan *logEntry which is naturally bounded and supports blocking read. We make write non-blocking using select i.e. we drop messages rather than allocate more space. Signed-off-by: David Scott --- pkg/memlogd/cmd/memlogd/main.go | 90 ++++++++++++--------------------- 1 file changed, 32 insertions(+), 58 deletions(-) diff --git a/pkg/memlogd/cmd/memlogd/main.go b/pkg/memlogd/cmd/memlogd/main.go index 6f92ed0d6..e2c0d7584 100644 --- a/pkg/memlogd/cmd/memlogd/main.go +++ b/pkg/memlogd/cmd/memlogd/main.go @@ -12,7 +12,7 @@ import ( "net" "os" "os/exec" - "sync" + "strings" "syscall" "time" ) @@ -43,8 +43,7 @@ type queryMessage struct { type connListener struct { conn net.Conn - cond *sync.Cond // condition and mutex used to notify listeners of more data - buffer bytes.Buffer + output chan *logEntry err error exitOnEOF bool // exit instead of blocking if no more data in read buffer } @@ -57,49 +56,12 @@ func doLog(logCh chan logEntry, msg string) { func logQueryHandler(l *connListener) { defer l.conn.Close() - data := make([]byte, 0xffff) - - l.cond.L.Lock() - for { - var n, remaining int - var rerr, werr error - - for rerr == nil && werr == nil { - if n, rerr = l.buffer.Read(data); n == 0 { // process data before checking error - break // exit read loop to wait for more data - } - l.cond.L.Unlock() - - remaining = n - w := data - for remaining > 0 && werr == nil { - w = data[:remaining] - n, werr = l.conn.Write(w) - w = w[n:] - remaining = remaining - n - } - - l.cond.L.Lock() + for msg := range l.output { + _, err := io.Copy(l.conn, strings.NewReader(msg.String()+"\n")) + if err != nil { + l.err = err + return } - - // check errors - if werr != nil { - l.err = werr - l.cond.L.Unlock() - break - } - - if rerr != nil && rerr != io.EOF { // EOF is ok, just wait for more data - l.err = rerr - l.cond.L.Unlock() - break - } - if l.exitOnEOF && rerr == io.EOF { // ... unless we should exit on EOF - l.err = nil - l.cond.L.Unlock() - break - } - l.cond.Wait() // unlock and wait for more data } } @@ -107,7 +69,7 @@ func (msg *logEntry) String() string { return fmt.Sprintf("%s %s %s", msg.time.Format(time.RFC3339), msg.source, msg.msg) } -func ringBufferHandler(ringSize int, logCh chan logEntry, queryMsgChan chan queryMessage) { +func ringBufferHandler(ringSize, chanSize int, logCh chan logEntry, queryMsgChan chan queryMessage) { // Anything that interacts with the ring buffer goes through this handler ring := ring.New(ringSize) listeners := list.New() @@ -129,10 +91,11 @@ func ringBufferHandler(ringSize int, logCh chan logEntry, queryMsgChan chan quer remove = append(remove, e) continue } - l.cond.L.Lock() - l.buffer.WriteString(fmt.Sprintf("%s\n", msg.String())) - l.cond.L.Unlock() - l.cond.Signal() + select { + case l.output <- &msg: + default: + // channel is full so drop message + } } if len(remove) > 0 { // remove listeners that returned errors for _, e := range remove { @@ -143,20 +106,31 @@ func ringBufferHandler(ringSize int, logCh chan logEntry, queryMsgChan chan quer } case msg := <-queryMsgChan: - l := connListener{conn: msg.conn, cond: sync.NewCond(&sync.Mutex{}), err: nil, exitOnEOF: (msg.mode == logDump)} - listeners.PushBack(&l) + l := connListener{ + conn: msg.conn, + output: make(chan *logEntry, chanSize), + err: nil, + exitOnEOF: (msg.mode == logDump), + } go logQueryHandler(&l) + if msg.mode == logDumpFollow || msg.mode == logFollow { + // register for future logs + listeners.PushBack(&l) + } if msg.mode == logDumpFollow || msg.mode == logDump { - l.cond.L.Lock() // fill with current data in buffer ring.Do(func(f interface{}) { if msg, ok := f.(logEntry); ok { - s := fmt.Sprintf("%s\n", msg.String()) - l.buffer.WriteString(s) + select { + case l.output <- &msg: + default: + // channel is full so drop message + } } }) - l.cond.L.Unlock() - l.cond.Signal() // signal handler that more data is available + } + if msg.mode == logDump { + close(l.output) } } } @@ -332,7 +306,7 @@ func main() { go receiveFdHandler(connLogFd, logCh, fdMsgChan) go receiveQueryHandler(connQuery, logCh, queryMsgChan) - go ringBufferHandler(linesInBuffer, logCh, queryMsgChan) + go ringBufferHandler(linesInBuffer, linesInBuffer, logCh, queryMsgChan) doLog(logCh, "memlogd started")