memlogd: replace an unbounded buffer with bounded channel

Previously we had a per-connection

  bytes.Buffer // to be written to the connection
  sync.Cond    // to allow us to Wait for more data

This had the major disadvantage that the buffer was unbounded and so
a slow client could cause memory exhaustion in the server. This patch
replaces these with a single

  chan *logEntry

which is naturally bounded and supports blocking read. We make write
non-blocking using select i.e. we drop messages rather than allocate
more space.

Signed-off-by: David Scott <dave.scott@docker.com>
This commit is contained in:
David Scott 2018-07-05 14:07:06 +01:00
parent 3e742018d6
commit 6aca715ad8

View File

@ -12,7 +12,7 @@ import (
"net" "net"
"os" "os"
"os/exec" "os/exec"
"sync" "strings"
"syscall" "syscall"
"time" "time"
) )
@ -43,8 +43,7 @@ type queryMessage struct {
type connListener struct { type connListener struct {
conn net.Conn conn net.Conn
cond *sync.Cond // condition and mutex used to notify listeners of more data output chan *logEntry
buffer bytes.Buffer
err error err error
exitOnEOF bool // exit instead of blocking if no more data in read buffer exitOnEOF bool // exit instead of blocking if no more data in read buffer
} }
@ -57,49 +56,12 @@ func doLog(logCh chan logEntry, msg string) {
func logQueryHandler(l *connListener) { func logQueryHandler(l *connListener) {
defer l.conn.Close() defer l.conn.Close()
data := make([]byte, 0xffff) for msg := range l.output {
_, err := io.Copy(l.conn, strings.NewReader(msg.String()+"\n"))
l.cond.L.Lock() if err != nil {
for { l.err = err
var n, remaining int return
var rerr, werr error
for rerr == nil && werr == nil {
if n, rerr = l.buffer.Read(data); n == 0 { // process data before checking error
break // exit read loop to wait for more data
} }
l.cond.L.Unlock()
remaining = n
w := data
for remaining > 0 && werr == nil {
w = data[:remaining]
n, werr = l.conn.Write(w)
w = w[n:]
remaining = remaining - n
}
l.cond.L.Lock()
}
// check errors
if werr != nil {
l.err = werr
l.cond.L.Unlock()
break
}
if rerr != nil && rerr != io.EOF { // EOF is ok, just wait for more data
l.err = rerr
l.cond.L.Unlock()
break
}
if l.exitOnEOF && rerr == io.EOF { // ... unless we should exit on EOF
l.err = nil
l.cond.L.Unlock()
break
}
l.cond.Wait() // unlock and wait for more data
} }
} }
@ -107,7 +69,7 @@ func (msg *logEntry) String() string {
return fmt.Sprintf("%s %s %s", msg.time.Format(time.RFC3339), msg.source, msg.msg) return fmt.Sprintf("%s %s %s", msg.time.Format(time.RFC3339), msg.source, msg.msg)
} }
func ringBufferHandler(ringSize int, logCh chan logEntry, queryMsgChan chan queryMessage) { func ringBufferHandler(ringSize, chanSize int, logCh chan logEntry, queryMsgChan chan queryMessage) {
// Anything that interacts with the ring buffer goes through this handler // Anything that interacts with the ring buffer goes through this handler
ring := ring.New(ringSize) ring := ring.New(ringSize)
listeners := list.New() listeners := list.New()
@ -129,10 +91,11 @@ func ringBufferHandler(ringSize int, logCh chan logEntry, queryMsgChan chan quer
remove = append(remove, e) remove = append(remove, e)
continue continue
} }
l.cond.L.Lock() select {
l.buffer.WriteString(fmt.Sprintf("%s\n", msg.String())) case l.output <- &msg:
l.cond.L.Unlock() default:
l.cond.Signal() // channel is full so drop message
}
} }
if len(remove) > 0 { // remove listeners that returned errors if len(remove) > 0 { // remove listeners that returned errors
for _, e := range remove { for _, e := range remove {
@ -143,20 +106,31 @@ func ringBufferHandler(ringSize int, logCh chan logEntry, queryMsgChan chan quer
} }
case msg := <-queryMsgChan: case msg := <-queryMsgChan:
l := connListener{conn: msg.conn, cond: sync.NewCond(&sync.Mutex{}), err: nil, exitOnEOF: (msg.mode == logDump)} l := connListener{
listeners.PushBack(&l) conn: msg.conn,
output: make(chan *logEntry, chanSize),
err: nil,
exitOnEOF: (msg.mode == logDump),
}
go logQueryHandler(&l) go logQueryHandler(&l)
if msg.mode == logDumpFollow || msg.mode == logFollow {
// register for future logs
listeners.PushBack(&l)
}
if msg.mode == logDumpFollow || msg.mode == logDump { if msg.mode == logDumpFollow || msg.mode == logDump {
l.cond.L.Lock()
// fill with current data in buffer // fill with current data in buffer
ring.Do(func(f interface{}) { ring.Do(func(f interface{}) {
if msg, ok := f.(logEntry); ok { if msg, ok := f.(logEntry); ok {
s := fmt.Sprintf("%s\n", msg.String()) select {
l.buffer.WriteString(s) case l.output <- &msg:
default:
// channel is full so drop message
}
} }
}) })
l.cond.L.Unlock() }
l.cond.Signal() // signal handler that more data is available if msg.mode == logDump {
close(l.output)
} }
} }
} }
@ -332,7 +306,7 @@ func main() {
go receiveFdHandler(connLogFd, logCh, fdMsgChan) go receiveFdHandler(connLogFd, logCh, fdMsgChan)
go receiveQueryHandler(connQuery, logCh, queryMsgChan) go receiveQueryHandler(connQuery, logCh, queryMsgChan)
go ringBufferHandler(linesInBuffer, logCh, queryMsgChan) go ringBufferHandler(linesInBuffer, linesInBuffer, logCh, queryMsgChan)
doLog(logCh, "memlogd started") doLog(logCh, "memlogd started")