memlogd: use kmsg format for reading the logs

Switch to a more formally-specified `kmsg`-style format for reading
the logs.

- update the spec in docs/logging.md
- check for bad names in pkg/memlogd with unit test

Signed-off-by: David Scott <dave.scott@docker.com>
This commit is contained in:
David Scott
2018-07-05 14:24:37 +01:00
parent fe64f33430
commit a07ecf60d5
3 changed files with 131 additions and 43 deletions

View File

@@ -66,7 +66,7 @@ func logQueryHandler(l *connListener) {
}
func (msg *logEntry) String() string {
return fmt.Sprintf("%s %s %s", msg.time.Format(time.RFC3339), msg.source, msg.msg)
return fmt.Sprintf("%s,%s;%s", msg.time.Format(time.RFC3339), msg.source, msg.msg)
}
func ringBufferHandler(ringSize, chanSize int, logCh chan logEntry, queryMsgChan chan queryMessage) {
@@ -220,6 +220,23 @@ func readLogFromFd(maxLineLen int, fd int, source string, logCh chan logEntry) {
}
}
func loggingRequestHandler(lineMaxLength int, logCh chan logEntry, fdMsgChan chan fdMessage) {
for true {
select {
case msg := <-fdMsgChan: // incoming fd
if strings.Contains(msg.name, ";") {
// The log message spec bans ";" in the log names
doLog(logCh, fmt.Sprintf("ERROR: cannot register log with name '%s' as it contains ;", msg.name))
if err := syscall.Close(msg.fd); err != nil {
doLog(logCh, fmt.Sprintf("ERROR: failed to close fd: %s", err))
}
continue
}
go readLogFromFd(lineMaxLength, msg.fd, msg.name, logCh)
}
}
}
func main() {
var err error
@@ -304,16 +321,14 @@ func main() {
fdMsgChan := make(chan fdMessage)
queryMsgChan := make(chan queryMessage)
// receive fds from the logging Unix domain socket and send on fdMsgChan
go receiveFdHandler(connLogFd, logCh, fdMsgChan)
// receive fds from the querying Unix domain socket and send on queryMsgChan
go receiveQueryHandler(connQuery, logCh, queryMsgChan)
// process both log messages and queries
go ringBufferHandler(linesInBuffer, linesInBuffer, logCh, queryMsgChan)
doLog(logCh, "memlogd started")
for true {
select {
case msg := <-fdMsgChan: // incoming fd
go readLogFromFd(lineMaxLength, msg.fd, msg.name, logCh)
}
}
loggingRequestHandler(lineMaxLength, logCh, fdMsgChan)
}

View File

@@ -6,6 +6,7 @@ import (
"log"
"net"
"os"
"strings"
"syscall"
"testing"
"time"
@@ -44,7 +45,7 @@ func TestFinite(t *testing.T) {
for i := 0; i < 2*linesInBuffer; i++ {
logCh <- logEntry{time: time.Now(), source: "memlogd", msg: "hello TestFinite"}
}
a, b := socketpair()
a, b := loopback()
defer a.Close()
defer b.Close()
queryM := queryMessage{
@@ -85,7 +86,7 @@ func TestFinite2(t *testing.T) {
logCh <- logEntry{time: time.Now(), source: "memlogd", msg: "hello TestFinite2"}
}
a, b := socketpair()
a, b := loopback()
defer a.Close()
defer b.Close()
queryM := queryMessage{
@@ -112,22 +113,86 @@ func TestFinite2(t *testing.T) {
}
}
func socketpair() (net.Conn, net.Conn) {
func TestGoodName(t *testing.T) {
// Test that the source names can't contain ";"
linesInBuffer := 10
logCh := make(chan logEntry)
fdMsgChan := make(chan fdMessage)
queryMsgChan := make(chan queryMessage)
go ringBufferHandler(linesInBuffer, linesInBuffer, logCh, queryMsgChan)
go loggingRequestHandler(80, logCh, fdMsgChan)
fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0)
if err != nil {
log.Fatal("Unable to create socketpair: ", err)
}
f := os.NewFile(uintptr(fds[0]), "a")
a, err := net.FileConn(f)
if err != nil {
log.Fatal("Unable to create net.Conn from socketpair: ", err)
a := fdToConn(fds[0])
b := fdToConn(fds[1])
defer a.Close()
defer b.Close()
// defer close fds
fdMsgChan <- fdMessage{
name: "semi-colons are banned;",
fd: fds[0],
}
_ = f.Close()
f = os.NewFile(uintptr(fds[1]), "b")
b, err := net.FileConn(f)
if err != nil {
log.Fatal("Unable to create net.Conn from socketpair: ", err)
// although the fd should be rejected my memlogd the Write should be buffered
// by the kernel and not block.
if _, err := b.Write([]byte("hello\n")); err != nil {
log.Fatalf("Failed to write log message: %s", err)
}
c, d := loopback()
defer c.Close()
defer d.Close()
// this log should not be in the ring because the connection was rejected.
queryM := queryMessage{
conn: c,
mode: logDumpFollow,
}
queryMsgChan <- queryM
// The error log is generated asynchronously. It should be fast. On error time out
// after 5s.
d.SetDeadline(time.Now().Add(5 * time.Second))
r := bufio.NewReader(d)
for {
line, err := r.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Unexpected error reading from socket: %s", err)
}
if strings.Contains(line, "ERROR: cannot register log") {
return
}
}
t.Fatal("Failed to read error message when registering a log with a ;")
}
// caller must close fd themselves: closing the net.Conn will not close fd.
func fdToConn(fd int) net.Conn {
f := os.NewFile(uintptr(fd), "")
c, err := net.FileConn(f)
if err != nil {
log.Fatal("Unable to create net.Conn from file descriptor: ", err)
}
return c
}
func loopback() (net.Conn, net.Conn) {
fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0)
if err != nil {
log.Fatal("Unable to create socketpair: ", err)
}
a := fdToConn(fds[0])
b := fdToConn(fds[1])
// net.Conns are independent of the fds, so we must close the fds now.
if err := syscall.Close(fds[0]); err != nil {
log.Fatal("Unable to close socketpair fd: ", err)
}
if err := syscall.Close(fds[1]); err != nil {
log.Fatal("Unable to close socketpair fd: ", err)
}
_ = f.Close()
return a, b
}